• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5892261038

17 Aug 2023 02:30PM UTC coverage: 67.256% (-0.02%) from 67.279%
5892261038

push

web-flow
chore: update cron job frequency to reset github notifications (#8956)

In our [scheduled
runs](https://github.com/dgraph-io/dgraph/actions?query=event%3Aschedule)
we see a number of "ghost" jobs. E.g.
[1](https://github.com/dgraph-io/dgraph/actions/runs/5881148541). This
is probably due to the following sequence of events:
- github actions sends a notification to developer who created /
modified cron job frequency
- if this person is no longer able to receive this notification (i.e. no
longer receiving emails) the action raise this ghost job with an error
about unverified email

This is a bug with github actions. After contacting github support they
suggested changing the cron job frequency in order to reset who receives
the notifications. This may solve the issue.

58589 of 87114 relevant lines covered (67.26%)

2221956.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.28
/dgraph/cmd/zero/oracle.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "context"
21
        "math/rand"
22
        "strconv"
23
        "strings"
24
        "time"
25

26
        "github.com/golang/glog"
27
        "github.com/pkg/errors"
28
        otrace "go.opencensus.io/trace"
29

30
        "github.com/dgraph-io/badger/v4/y"
31
        "github.com/dgraph-io/dgo/v230/protos/api"
32
        "github.com/dgraph-io/dgraph/protos/pb"
33
        "github.com/dgraph-io/dgraph/x"
34
        "github.com/dgraph-io/ristretto/z"
35
)
36

37
// Oracle stores and manages the transaction state and conflict detection.
38
type Oracle struct {
39
        x.SafeMutex
40
        commits map[uint64]uint64 // startTs -> commitTs
41
        // TODO: Check if we need LRU.
42
        keyCommit   *z.Tree // fp(key) -> commitTs. Used to detect conflict.
43
        maxAssigned uint64  // max transaction assigned by us.
44

45
        // All transactions with startTs < startTxnTs return true for hasConflict.
46
        startTxnTs  uint64
47
        subscribers map[int]chan pb.OracleDelta
48
        updates     chan *pb.OracleDelta
49
        doneUntil   y.WaterMark
50
}
51

52
// Init initializes the oracle.
53
func (o *Oracle) Init() {
66✔
54
        o.commits = make(map[uint64]uint64)
66✔
55
        // Remove the older btree file, before creating NewTree, as it may contain stale data leading
66✔
56
        // to wrong results.
66✔
57
        o.keyCommit = z.NewTree("oracle")
66✔
58
        o.subscribers = make(map[int]chan pb.OracleDelta)
66✔
59
        o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
66✔
60
        o.doneUntil.Init(nil)
66✔
61
        go o.sendDeltasToSubscribers()
66✔
62
}
66✔
63

64
// close releases the memory associated with btree used for keycommit.
65
func (o *Oracle) close() {
66✔
66
        if err := o.keyCommit.Close(); err != nil {
66✔
67
                glog.Warningf("error while closing tree: %v", err)
×
68
        }
×
69
}
70

71
func (o *Oracle) updateStartTxnTs(ts uint64) {
52✔
72
        o.Lock()
52✔
73
        defer o.Unlock()
52✔
74
        o.startTxnTs = ts
52✔
75
        o.keyCommit.Reset()
52✔
76
}
52✔
77

78
// TODO: This should be done during proposal application for Txn status.
79
func (o *Oracle) hasConflict(src *api.TxnContext) bool {
19,854✔
80
        // This transaction was started before I became leader.
19,854✔
81
        if src.StartTs < o.startTxnTs {
19,854✔
82
                return true
×
83
        }
×
84
        for _, k := range src.Keys {
1,075,205✔
85
                ki, err := strconv.ParseUint(k, 36, 64)
1,055,351✔
86
                if err != nil {
1,055,351✔
87
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
88
                        continue
×
89
                }
90
                if last := o.keyCommit.Get(ki); last > src.StartTs {
1,057,246✔
91
                        return true
1,895✔
92
                }
1,895✔
93
        }
94
        return false
17,959✔
95
}
96

97
func (o *Oracle) purgeBelow(minTs uint64) {
43✔
98
        var timer x.Timer
43✔
99
        timer.Start()
43✔
100

43✔
101
        o.Lock()
43✔
102
        defer o.Unlock()
43✔
103

43✔
104
        // Set startTxnTs so that every txn with start ts less than this, would be aborted.
43✔
105
        o.startTxnTs = minTs
43✔
106

43✔
107
        // Dropping would be cheaper if abort/commits map is sharded
43✔
108
        for ts := range o.commits {
30,713✔
109
                if ts < minTs {
47,877✔
110
                        delete(o.commits, ts)
17,207✔
111
                }
17,207✔
112
        }
113
        timer.Record("commits")
43✔
114

43✔
115
        // There is no transaction running with startTs less than minTs
43✔
116
        // So we can delete everything from rowCommit whose commitTs < minTs
43✔
117
        stats := o.keyCommit.Stats()
43✔
118
        if stats.Occupancy < 50.0 {
80✔
119
                return
37✔
120
        }
37✔
121
        o.keyCommit.DeleteBelow(minTs)
6✔
122
        timer.Record("deleteBelow")
6✔
123
        glog.V(2).Infof("Purged below ts:%d, len(o.commits):%d, keyCommit: [before: %+v, after: %+v].\n",
6✔
124
                minTs, len(o.commits), stats, o.keyCommit.Stats())
6✔
125
        if timer.Total() > time.Second {
6✔
126
                glog.V(2).Infof("Purge %s\n", timer.String())
×
127
        }
×
128
}
129

130
func (o *Oracle) commit(src *api.TxnContext) error {
8,999✔
131
        o.Lock()
8,999✔
132
        defer o.Unlock()
8,999✔
133

8,999✔
134
        if o.hasConflict(src) {
9,038✔
135
                return x.ErrConflict
39✔
136
        }
39✔
137
        // We store src.Keys as string to ensure compatibility with all the various language clients we
138
        // have. But, really they are just uint64s encoded as strings. We use base 36 during creation of
139
        // these keys in FillContext in posting/mvcc.go.
140
        for _, k := range src.Keys {
535,315✔
141
                ki, err := strconv.ParseUint(k, 36, 64)
526,355✔
142
                if err != nil {
526,355✔
143
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
144
                        continue
×
145
                }
146
                o.keyCommit.Set(ki, src.CommitTs) // CommitTs is handed out before calling this func.
526,355✔
147
        }
148
        return nil
8,960✔
149
}
150

151
func (o *Oracle) currentState() *pb.OracleDelta {
78✔
152
        o.AssertRLock()
78✔
153
        resp := &pb.OracleDelta{}
78✔
154
        for start, commit := range o.commits {
78✔
155
                resp.Txns = append(resp.Txns,
×
156
                        &pb.TxnStatus{StartTs: start, CommitTs: commit})
×
157
        }
×
158
        resp.MaxAssigned = o.maxAssigned
78✔
159
        return resp
78✔
160
}
161

162
func (o *Oracle) newSubscriber() (<-chan pb.OracleDelta, int) {
78✔
163
        o.Lock()
78✔
164
        defer o.Unlock()
78✔
165
        var id int
78✔
166
        for {
156✔
167
                //nolint:gosec // random generator for node id does not require cryptographic precision
78✔
168
                id = rand.Int()
78✔
169
                if _, has := o.subscribers[id]; !has {
156✔
170
                        break
78✔
171
                }
172
        }
173

174
        // The channel takes a delta instead of a pointer as the receiver needs to
175
        // modify it by setting the group checksums. Passing a pointer previously
176
        // resulted in a race condition.
177
        ch := make(chan pb.OracleDelta, 1000)
78✔
178
        ch <- *o.currentState() // Queue up the full state as the first entry.
78✔
179
        o.subscribers[id] = ch
78✔
180
        return ch, id
78✔
181
}
182

183
func (o *Oracle) removeSubscriber(id int) {
78✔
184
        o.Lock()
78✔
185
        defer o.Unlock()
78✔
186
        delete(o.subscribers, id)
78✔
187
}
78✔
188

189
// sendDeltasToSubscribers reads updates from the o.updates
190
// constructs a delta object containing transactions from one or more updates
191
// and sends the delta object to each subscriber's channel
192
func (o *Oracle) sendDeltasToSubscribers() {
66✔
193
        delta := &pb.OracleDelta{}
66✔
194
        ticker := time.NewTicker(time.Second)
66✔
195
        defer ticker.Stop()
66✔
196

66✔
197
        // waitFor calculates the maximum value of delta.MaxAssigned and all the CommitTs of delta.Txns
66✔
198
        waitFor := func() uint64 {
107,165✔
199
                w := delta.MaxAssigned
107,099✔
200
                for _, txn := range delta.Txns {
42,875,058✔
201
                        w = x.Max(w, txn.CommitTs)
42,767,959✔
202
                }
42,767,959✔
203
                return w
107,099✔
204
        }
205

206
        for {
94,056✔
207
        get_update:
93,990✔
208
                var update *pb.OracleDelta
107,116✔
209
                select {
107,116✔
210
                case update = <-o.updates:
93,875✔
211
                case <-ticker.C:
13,175✔
212
                        wait := waitFor()
13,175✔
213
                        if wait == 0 || o.doneUntil.DoneUntil() < wait {
26,301✔
214
                                goto get_update
13,126✔
215
                        }
216
                        // Send empty update.
217
                        update = &pb.OracleDelta{}
49✔
218
                }
219
        slurp_loop:
93,924✔
220
                for {
189,222✔
221
                        delta.MaxAssigned = x.Max(delta.MaxAssigned, update.MaxAssigned)
95,298✔
222
                        delta.Txns = append(delta.Txns, update.Txns...)
95,298✔
223
                        select {
95,298✔
224
                        case update = <-o.updates:
1,374✔
225
                        default:
93,924✔
226
                                break slurp_loop
93,924✔
227
                        }
228
                }
229
                // No need to sort the txn updates here. Alpha would sort them before
230
                // applying.
231

232
                // Let's ensure that we have all the commits up until the max here.
233
                // Otherwise, we'll be sending commit timestamps out of order, which
234
                // would cause Alphas to drop some of them, during writes to Badger.
235
                if o.doneUntil.DoneUntil() < waitFor() {
113,132✔
236
                        continue // The for loop doing blocking reads from o.updates.
19,208✔
237
                        // We need at least one entry from the updates channel to pick up a missing update.
238
                        // Don't goto slurp_loop, because it would break from select immediately.
239
                }
240

241
                if glog.V(3) {
74,716✔
242
                        glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
×
243
                }
×
244
                o.Lock()
74,716✔
245
                for id, ch := range o.subscribers {
227,062✔
246
                        select {
152,346✔
247
                        case ch <- *delta:
152,346✔
248
                        default:
×
249
                                close(ch)
×
250
                                delete(o.subscribers, id)
×
251
                        }
252
                }
253
                o.Unlock()
74,716✔
254
                delta = &pb.OracleDelta{}
74,716✔
255
        }
256
}
257

258
func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) bool {
23,210✔
259
        o.Lock()
23,210✔
260
        defer o.Unlock()
23,210✔
261
        if _, ok := o.commits[src.StartTs]; ok {
23,845✔
262
                return false
635✔
263
        }
635✔
264
        if src.Aborted {
27,996✔
265
                o.commits[src.StartTs] = 0
5,421✔
266
        } else {
22,575✔
267
                o.commits[src.StartTs] = src.CommitTs
17,154✔
268
        }
17,154✔
269
        return true
22,575✔
270
}
271

272
func (o *Oracle) updateCommitStatus(index uint64, src *api.TxnContext) {
23,210✔
273
        // TODO: We should check if the tablet is in read-only status here.
23,210✔
274
        if o.updateCommitStatusHelper(index, src) {
45,785✔
275
                delta := new(pb.OracleDelta)
22,575✔
276
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
22,575✔
277
                        StartTs:  src.StartTs,
22,575✔
278
                        CommitTs: o.commitTs(src.StartTs),
22,575✔
279
                })
22,575✔
280
                o.updates <- delta
22,575✔
281
        }
22,575✔
282
}
283

284
func (o *Oracle) commitTs(startTs uint64) uint64 {
33,875✔
285
        o.RLock()
33,875✔
286
        defer o.RUnlock()
33,875✔
287
        return o.commits[startTs]
33,875✔
288
}
33,875✔
289

290
func (o *Oracle) storePending(ids *pb.AssignedIds) {
65,228✔
291
        // Wait to finish up processing everything before start id.
65,228✔
292
        max := x.Max(ids.EndId, ids.ReadOnly)
65,228✔
293
        if err := o.doneUntil.WaitForMark(context.Background(), max); err != nil {
65,228✔
294
                glog.Errorf("Error while waiting for mark: %+v", err)
×
295
        }
×
296

297
        // Now send it out to updates.
298
        o.updates <- &pb.OracleDelta{MaxAssigned: max}
65,228✔
299

65,228✔
300
        o.Lock()
65,228✔
301
        defer o.Unlock()
65,228✔
302
        o.maxAssigned = x.Max(o.maxAssigned, max)
65,228✔
303
}
304

305
// MaxPending returns the maximum assigned timestamp.
306
func (o *Oracle) MaxPending() uint64 {
460✔
307
        o.RLock()
460✔
308
        defer o.RUnlock()
460✔
309
        return o.maxAssigned
460✔
310
}
460✔
311

312
// proposeTxn proposes a txn update, and then updates src to reflect the state
313
// of the commit after proposal is run.
314
func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
11,300✔
315
        var zp pb.ZeroProposal
11,300✔
316
        zp.Txn = &api.TxnContext{
11,300✔
317
                StartTs:  src.StartTs,
11,300✔
318
                CommitTs: src.CommitTs,
11,300✔
319
                Aborted:  src.Aborted,
11,300✔
320
        }
11,300✔
321

11,300✔
322
        // NOTE: It is important that we continue retrying proposeTxn until we succeed. This should
11,300✔
323
        // happen, irrespective of what the user context timeout might be. We check for it before
11,300✔
324
        // reaching this stage, but now that we're here, we have to ensure that the commit proposal goes
11,300✔
325
        // through. Otherwise, we should block here forever. If we don't do this, we'll see txn
11,300✔
326
        // violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would
11,300✔
327
        // cause newer txns to see older data.
11,300✔
328

11,300✔
329
        // If this node stops being the leader, we want this proposal to not be forwarded to the leader,
11,300✔
330
        // and get aborted.
11,300✔
331
        if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
11,300✔
332
                return err
×
333
        }
×
334

335
        // There might be race between this proposal trying to commit and predicate
336
        // move aborting it. A predicate move, triggered by Zero, would abort all
337
        // pending transactions.  At the same time, a client which has already done
338
        // mutations, can proceed to commit it. A race condition can happen here,
339
        // with both proposing their respective states, only one can succeed after
340
        // the proposal is done. So, check again to see the fate of the transaction
341
        // here.
342
        src.CommitTs = s.orc.commitTs(src.StartTs)
11,300✔
343
        if src.CommitTs == 0 {
13,639✔
344
                src.Aborted = true
2,339✔
345
        }
2,339✔
346
        return nil
11,300✔
347
}
348

349
func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
11,300✔
350
        span := otrace.FromContext(ctx)
11,300✔
351
        span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
11,300✔
352
        if src.Aborted {
11,745✔
353
                return s.proposeTxn(ctx, src)
445✔
354
        }
445✔
355

356
        // Use the start timestamp to check if we have a conflict, before we need to assign a commit ts.
357
        s.orc.RLock()
10,855✔
358
        conflict := s.orc.hasConflict(src)
10,855✔
359
        s.orc.RUnlock()
10,855✔
360
        if conflict {
12,711✔
361
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)},
1,856✔
362
                        "Oracle found conflict")
1,856✔
363
                src.Aborted = true
1,856✔
364
                return s.proposeTxn(ctx, src)
1,856✔
365
        }
1,856✔
366

367
        checkPreds := func() error {
17,998✔
368
                // Check if any of these tablets is being moved. If so, abort the transaction.
8,999✔
369
                for _, pkey := range src.Preds {
28,486✔
370
                        splits := strings.SplitN(pkey, "-", 2)
19,487✔
371
                        if len(splits) < 2 {
19,487✔
372
                                return errors.Errorf("Unable to find group id in %s", pkey)
×
373
                        }
×
374
                        gid, err := strconv.Atoi(splits[0])
19,487✔
375
                        if err != nil {
19,487✔
376
                                return errors.Wrapf(err, "unable to parse group id from %s", pkey)
×
377
                        }
×
378
                        pred := splits[1]
19,487✔
379
                        tablet := s.ServingTablet(pred)
19,487✔
380
                        if tablet == nil {
19,487✔
381
                                return errors.Errorf("Tablet for %s is nil", pred)
×
382
                        }
×
383
                        if tablet.GroupId != uint32(gid) {
19,487✔
384
                                return errors.Errorf("Mutation done in group: %d. Predicate %s assigned to %d",
×
385
                                        gid, pred, tablet.GroupId)
×
386
                        }
×
387
                        if s.isBlocked(pred) {
19,487✔
388
                                return errors.Errorf("Commits on predicate %s are blocked due to predicate move", pred)
×
389
                        }
×
390
                }
391
                return nil
8,999✔
392
        }
393
        if err := checkPreds(); err != nil {
8,999✔
394
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, err.Error())
×
395
                src.Aborted = true
×
396
                return s.proposeTxn(ctx, src)
×
397
        }
×
398

399
        num := pb.Num{Val: 1, Type: pb.Num_TXN_TS}
8,999✔
400
        assigned, err := s.lease(ctx, &num)
8,999✔
401
        if err != nil {
8,999✔
402
                return err
×
403
        }
×
404
        src.CommitTs = assigned.StartId
8,999✔
405
        // Mark the transaction as done, irrespective of whether the proposal succeeded or not.
8,999✔
406
        defer s.orc.doneUntil.Done(src.CommitTs)
8,999✔
407
        span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
8,999✔
408
                "Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)
8,999✔
409

8,999✔
410
        if err := s.orc.commit(src); err != nil {
9,038✔
411
                span.Annotatef(nil, "Found a conflict. Aborting.")
39✔
412
                src.Aborted = true
39✔
413
        }
39✔
414
        if err := ctx.Err(); err != nil {
8,999✔
415
                span.Annotatef(nil, "Aborting txn due to context timing out.")
×
416
                src.Aborted = true
×
417
        }
×
418
        // Propose txn should be used to set watermark as done.
419
        return s.proposeTxn(ctx, src)
8,999✔
420
}
421

422
// CommitOrAbort either commits a transaction or aborts it.
423
// The abortion can happen under the following conditions
424
// 1) the api.TxnContext.Aborted flag is set in the src argument
425
// 2) if there's an error (e.g server is not the leader or there's a conflicting transaction)
426
func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) {
11,300✔
427
        if ctx.Err() != nil {
11,300✔
428
                return nil, ctx.Err()
×
429
        }
×
430
        ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
11,300✔
431
        defer span.End()
11,300✔
432

11,300✔
433
        if !s.Node.AmLeader() {
11,300✔
434
                return nil, errors.Errorf("Only leader can decide to commit or abort")
×
435
        }
×
436
        err := s.commit(ctx, src)
11,300✔
437
        if err != nil {
11,300✔
438
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("error", true)}, err.Error())
×
439
        }
×
440
        return src, err
11,300✔
441
}
442

443
var errClosed = errors.New("Streaming closed by oracle")
444
var errNotLeader = errors.New("Node is no longer leader")
445

446
// Oracle streams the oracle state to the alphas.
447
// The first entry sent by Zero contains the entire state of transactions. Zero periodically
448
// confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a
449
// safe way to get the status of all the transactions.
450
func (s *Server) Oracle(_ *api.Payload, server pb.Zero_OracleServer) error {
78✔
451
        if !s.Node.AmLeader() {
78✔
452
                return errNotLeader
×
453
        }
×
454
        ch, id := s.orc.newSubscriber()
78✔
455
        defer s.orc.removeSubscriber(id)
78✔
456

78✔
457
        ctx := server.Context()
78✔
458
        leaderChangeCh := s.leaderChangeChannel()
78✔
459
        for {
152,580✔
460
                select {
152,502✔
461
                case <-leaderChangeCh:
×
462
                        return errNotLeader
×
463
                case delta, open := <-ch:
152,424✔
464
                        if !open {
152,424✔
465
                                return errClosed
×
466
                        }
×
467
                        // Pass in the latest group checksum as well, so the Alpha can use that to determine
468
                        // when not to service a read.
469
                        delta.GroupChecksums = s.groupChecksums()
152,424✔
470
                        if err := server.Send(&delta); err != nil {
152,424✔
471
                                return err
×
472
                        }
×
473
                case <-ctx.Done():
36✔
474
                        return ctx.Err()
36✔
475
                case <-s.closer.HasBeenClosed():
42✔
476
                        return errServerShutDown
42✔
477
                }
478
        }
479
}
480

481
// TryAbort attempts to abort the given transactions which are not already committed..
482
func (s *Server) TryAbort(ctx context.Context,
483
        txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
×
484
        delta := &pb.OracleDelta{}
×
485
        for _, startTs := range txns.Ts {
×
486
                // Do via proposals to avoid race
×
487
                tctx := &api.TxnContext{StartTs: startTs, Aborted: true}
×
488
                if err := s.proposeTxn(ctx, tctx); err != nil {
×
489
                        return delta, err
×
490
                }
×
491
                // Txn should be aborted if not already committed.
492
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
×
493
                        StartTs:  startTs,
×
494
                        CommitTs: s.orc.commitTs(startTs)})
×
495
        }
496
        return delta, nil
×
497
}
498

499
// Timestamps is used to assign startTs for a new transaction
500
func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
65,943✔
501
        ctx, span := otrace.StartSpan(ctx, "Zero.Timestamps")
65,943✔
502
        defer span.End()
65,943✔
503

65,943✔
504
        span.Annotatef(nil, "Zero id: %d. Timestamp request: %+v", s.Node.Id, num)
65,943✔
505
        if ctx.Err() != nil {
65,943✔
506
                return &emptyAssignedIds, ctx.Err()
×
507
        }
×
508

509
        num.Type = pb.Num_TXN_TS
65,943✔
510
        reply, err := s.lease(ctx, num)
65,943✔
511
        span.Annotatef(nil, "Response: %+v. Error: %v", reply, err)
65,943✔
512

65,943✔
513
        switch err {
65,943✔
514
        case nil:
65,228✔
515
                s.orc.doneUntil.Done(x.Max(reply.EndId, reply.ReadOnly))
65,228✔
516
                go s.orc.storePending(reply)
65,228✔
517
        case errServedFromMemory:
714✔
518
                // Avoid calling doneUntil.Done, and storePending.
714✔
519
                err = nil
714✔
520
        default:
1✔
521
                glog.Errorf("Got error: %v while leasing timestamps: %+v", err, num)
1✔
522
        }
523
        return reply, err
65,943✔
524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc