• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 6000148643

28 Aug 2023 12:58PM UTC coverage: 67.112% (+0.5%) from 66.655%
6000148643

push

web-flow
feat(dql): add @unique constraint support in schema for new predicates (#8827)

Partially Fixes #8827
Closes: DGRAPHCORE-206
Docs PR: https://github.com/dgraph-io/dgraph-docs/pull/638

This PR adds support for uniqueness constraint using @unique directive
in DQL schema. This unique directive ensures that all values of the
predicate are different in a Dgraph Cluster. This completes phase 1, and
enables adding a new predicate with unique directive. As part of the
phase 2, we will work on adding support for unique directive for
existing predicates.

## Performance
Live Loader before this change on 21 million dataset took 10m54s whereas
after this change took 11m02s. It did not make any significant different
to non-unique predicates.

## How to Use
You can now specify unique in schema as follows: `email: string @unique
@index(hash) @upsert .`. Now, Dgraph will ensure that no mutation adds a
duplicate for the predicate email.

## Phase 2 [TODO]
- [ ] check if @unique can be added to schema depending upon whether
existing data has any duplicates. If the existing data has any
duplicates, we do not allow adding the @unique directive and return a
query that allows user to identify these UIDs.
- [ ] If index computation is in progress, we should not allow mutations
with predicates for which @unique is set
- [ ] Fix ACL to ensure that we do not end up adding duplicate users
- [ ] Ensure that unique constraint is not violated during Bulk loader

---------

Co-authored-by: Aman Mangal <aman@dgraph.io>

347 of 347 new or added lines in 8 files covered. (100.0%)

58763 of 87560 relevant lines covered (67.11%)

2200726.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.13
/dgraph/cmd/zero/oracle.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "context"
21
        "math/rand"
22
        "strconv"
23
        "strings"
24
        "time"
25

26
        "github.com/golang/glog"
27
        "github.com/pkg/errors"
28
        otrace "go.opencensus.io/trace"
29

30
        "github.com/dgraph-io/badger/v4/y"
31
        "github.com/dgraph-io/dgo/v230/protos/api"
32
        "github.com/dgraph-io/dgraph/protos/pb"
33
        "github.com/dgraph-io/dgraph/x"
34
        "github.com/dgraph-io/ristretto/z"
35
)
36

37
// Oracle stores and manages the transaction state and conflict detection.
38
type Oracle struct {
39
        x.SafeMutex
40
        commits map[uint64]uint64 // startTs -> commitTs
41
        // TODO: Check if we need LRU.
42
        keyCommit   *z.Tree // fp(key) -> commitTs. Used to detect conflict.
43
        maxAssigned uint64  // max transaction assigned by us.
44

45
        // All transactions with startTs < startTxnTs return true for hasConflict.
46
        startTxnTs  uint64
47
        subscribers map[int]chan pb.OracleDelta
48
        updates     chan *pb.OracleDelta
49
        doneUntil   y.WaterMark
50
}
51

52
// Init initializes the oracle.
53
func (o *Oracle) Init() {
64✔
54
        o.commits = make(map[uint64]uint64)
64✔
55
        // Remove the older btree file, before creating NewTree, as it may contain stale data leading
64✔
56
        // to wrong results.
64✔
57
        o.keyCommit = z.NewTree("oracle")
64✔
58
        o.subscribers = make(map[int]chan pb.OracleDelta)
64✔
59
        o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
64✔
60
        o.doneUntil.Init(nil)
64✔
61
        go o.sendDeltasToSubscribers()
64✔
62
}
64✔
63

64
// close releases the memory associated with btree used for keycommit.
65
func (o *Oracle) close() {
64✔
66
        if err := o.keyCommit.Close(); err != nil {
64✔
67
                glog.Warningf("error while closing tree: %v", err)
×
68
        }
×
69
}
70

71
func (o *Oracle) updateStartTxnTs(ts uint64) {
52✔
72
        o.Lock()
52✔
73
        defer o.Unlock()
52✔
74
        o.startTxnTs = ts
52✔
75
        o.keyCommit.Reset()
52✔
76
}
52✔
77

78
// TODO: This should be done during proposal application for Txn status.
79
func (o *Oracle) hasConflict(src *api.TxnContext) bool {
21,868✔
80
        // This transaction was started before I became leader.
21,868✔
81
        if src.StartTs < o.startTxnTs {
21,868✔
82
                return true
×
83
        }
×
84
        for _, k := range src.Keys {
1,082,058✔
85
                ki, err := strconv.ParseUint(k, 36, 64)
1,060,190✔
86
                if err != nil {
1,060,190✔
87
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
88
                        continue
×
89
                }
90
                if last := o.keyCommit.Get(ki); last > src.StartTs {
1,062,078✔
91
                        return true
1,888✔
92
                }
1,888✔
93
        }
94
        return false
19,980✔
95
}
96

97
func (o *Oracle) purgeBelow(minTs uint64) {
44✔
98
        var timer x.Timer
44✔
99
        timer.Start()
44✔
100

44✔
101
        o.Lock()
44✔
102
        defer o.Unlock()
44✔
103

44✔
104
        // Set startTxnTs so that every txn with start ts less than this, would be aborted.
44✔
105
        o.startTxnTs = minTs
44✔
106

44✔
107
        // Dropping would be cheaper if abort/commits map is sharded
44✔
108
        for ts := range o.commits {
38,490✔
109
                if ts < minTs {
59,607✔
110
                        delete(o.commits, ts)
21,161✔
111
                }
21,161✔
112
        }
113
        timer.Record("commits")
44✔
114

44✔
115
        // There is no transaction running with startTs less than minTs
44✔
116
        // So we can delete everything from rowCommit whose commitTs < minTs
44✔
117
        stats := o.keyCommit.Stats()
44✔
118
        if stats.Occupancy < 50.0 {
82✔
119
                return
38✔
120
        }
38✔
121
        o.keyCommit.DeleteBelow(minTs)
6✔
122
        timer.Record("deleteBelow")
6✔
123
        glog.V(2).Infof("Purged below ts:%d, len(o.commits):%d, keyCommit: [before: %+v, after: %+v].\n",
6✔
124
                minTs, len(o.commits), stats, o.keyCommit.Stats())
6✔
125
        if timer.Total() > time.Second {
6✔
126
                glog.V(2).Infof("Purge %s\n", timer.String())
×
127
        }
×
128
}
129

130
func (o *Oracle) commit(src *api.TxnContext) error {
10,006✔
131
        o.Lock()
10,006✔
132
        defer o.Unlock()
10,006✔
133

10,006✔
134
        if o.hasConflict(src) {
10,038✔
135
                return x.ErrConflict
32✔
136
        }
32✔
137
        // We store src.Keys as string to ensure compatibility with all the various language clients we
138
        // have. But, really they are just uint64s encoded as strings. We use base 36 during creation of
139
        // these keys in FillContext in posting/mvcc.go.
140
        for _, k := range src.Keys {
538,800✔
141
                ki, err := strconv.ParseUint(k, 36, 64)
528,826✔
142
                if err != nil {
528,826✔
143
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
144
                        continue
×
145
                }
146
                o.keyCommit.Set(ki, src.CommitTs) // CommitTs is handed out before calling this func.
528,826✔
147
        }
148
        return nil
9,974✔
149
}
150

151
func (o *Oracle) currentState() *pb.OracleDelta {
79✔
152
        o.AssertRLock()
79✔
153
        resp := &pb.OracleDelta{}
79✔
154
        for start, commit := range o.commits {
139✔
155
                resp.Txns = append(resp.Txns,
60✔
156
                        &pb.TxnStatus{StartTs: start, CommitTs: commit})
60✔
157
        }
60✔
158
        resp.MaxAssigned = o.maxAssigned
79✔
159
        return resp
79✔
160
}
161

162
func (o *Oracle) newSubscriber() (<-chan pb.OracleDelta, int) {
79✔
163
        o.Lock()
79✔
164
        defer o.Unlock()
79✔
165
        var id int
79✔
166
        for {
158✔
167
                //nolint:gosec // random generator for node id does not require cryptographic precision
79✔
168
                id = rand.Int()
79✔
169
                if _, has := o.subscribers[id]; !has {
158✔
170
                        break
79✔
171
                }
172
        }
173

174
        // The channel takes a delta instead of a pointer as the receiver needs to
175
        // modify it by setting the group checksums. Passing a pointer previously
176
        // resulted in a race condition.
177
        ch := make(chan pb.OracleDelta, 1000)
79✔
178
        ch <- *o.currentState() // Queue up the full state as the first entry.
79✔
179
        o.subscribers[id] = ch
79✔
180
        return ch, id
79✔
181
}
182

183
func (o *Oracle) removeSubscriber(id int) {
79✔
184
        o.Lock()
79✔
185
        defer o.Unlock()
79✔
186
        delete(o.subscribers, id)
79✔
187
}
79✔
188

189
// sendDeltasToSubscribers reads updates from the o.updates
190
// constructs a delta object containing transactions from one or more updates
191
// and sends the delta object to each subscriber's channel
192
func (o *Oracle) sendDeltasToSubscribers() {
64✔
193
        delta := &pb.OracleDelta{}
64✔
194
        ticker := time.NewTicker(time.Second)
64✔
195
        defer ticker.Stop()
64✔
196

64✔
197
        // waitFor calculates the maximum value of delta.MaxAssigned and all the CommitTs of delta.Txns
64✔
198
        waitFor := func() uint64 {
109,590✔
199
                w := delta.MaxAssigned
109,526✔
200
                for _, txn := range delta.Txns {
61,537,238✔
201
                        w = x.Max(w, txn.CommitTs)
61,427,712✔
202
                }
61,427,712✔
203
                return w
109,526✔
204
        }
205

206
        for {
96,985✔
207
        get_update:
96,921✔
208
                var update *pb.OracleDelta
109,540✔
209
                select {
109,540✔
210
                case update = <-o.updates:
96,807✔
211
                case <-ticker.C:
12,669✔
212
                        wait := waitFor()
12,669✔
213
                        if wait == 0 || o.doneUntil.DoneUntil() < wait {
25,288✔
214
                                goto get_update
12,619✔
215
                        }
216
                        // Send empty update.
217
                        update = &pb.OracleDelta{}
50✔
218
                }
219
        slurp_loop:
96,857✔
220
                for {
196,762✔
221
                        delta.MaxAssigned = x.Max(delta.MaxAssigned, update.MaxAssigned)
99,905✔
222
                        delta.Txns = append(delta.Txns, update.Txns...)
99,905✔
223
                        select {
99,905✔
224
                        case update = <-o.updates:
3,048✔
225
                        default:
96,857✔
226
                                break slurp_loop
96,857✔
227
                        }
228
                }
229
                // No need to sort the txn updates here. Alpha would sort them before
230
                // applying.
231

232
                // Let's ensure that we have all the commits up until the max here.
233
                // Otherwise, we'll be sending commit timestamps out of order, which
234
                // would cause Alphas to drop some of them, during writes to Badger.
235
                if o.doneUntil.DoneUntil() < waitFor() {
118,084✔
236
                        continue // The for loop doing blocking reads from o.updates.
21,227✔
237
                        // We need at least one entry from the updates channel to pick up a missing update.
238
                        // Don't goto slurp_loop, because it would break from select immediately.
239
                }
240

241
                if glog.V(3) {
75,630✔
242
                        glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
×
243
                }
×
244
                o.Lock()
75,630✔
245
                for id, ch := range o.subscribers {
228,614✔
246
                        select {
152,984✔
247
                        case ch <- *delta:
152,984✔
248
                        default:
×
249
                                close(ch)
×
250
                                delete(o.subscribers, id)
×
251
                        }
252
                }
253
                o.Unlock()
75,630✔
254
                delta = &pb.OracleDelta{}
75,630✔
255
        }
256
}
257

258
func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) bool {
27,099✔
259
        o.Lock()
27,099✔
260
        defer o.Unlock()
27,099✔
261
        if _, ok := o.commits[src.StartTs]; ok {
27,719✔
262
                return false
620✔
263
        }
620✔
264
        if src.Aborted {
31,900✔
265
                o.commits[src.StartTs] = 0
5,421✔
266
        } else {
26,479✔
267
                o.commits[src.StartTs] = src.CommitTs
21,058✔
268
        }
21,058✔
269
        return true
26,479✔
270
}
271

272
func (o *Oracle) updateCommitStatus(index uint64, src *api.TxnContext) {
27,099✔
273
        // TODO: We should check if the tablet is in read-only status here.
27,099✔
274
        if o.updateCommitStatusHelper(index, src) {
53,578✔
275
                delta := new(pb.OracleDelta)
26,479✔
276
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
26,479✔
277
                        StartTs:  src.StartTs,
26,479✔
278
                        CommitTs: o.commitTs(src.StartTs),
26,479✔
279
                })
26,479✔
280
                o.updates <- delta
26,479✔
281
        }
26,479✔
282
}
283

284
func (o *Oracle) commitTs(startTs uint64) uint64 {
38,784✔
285
        o.RLock()
38,784✔
286
        defer o.RUnlock()
38,784✔
287
        return o.commits[startTs]
38,784✔
288
}
38,784✔
289

290
func (o *Oracle) storePending(ids *pb.AssignedIds) {
65,928✔
291
        // Wait to finish up processing everything before start id.
65,928✔
292
        max := x.Max(ids.EndId, ids.ReadOnly)
65,928✔
293
        if err := o.doneUntil.WaitForMark(context.Background(), max); err != nil {
65,928✔
294
                glog.Errorf("Error while waiting for mark: %+v", err)
×
295
        }
×
296

297
        // Now send it out to updates.
298
        o.updates <- &pb.OracleDelta{MaxAssigned: max}
65,928✔
299

65,928✔
300
        o.Lock()
65,928✔
301
        defer o.Unlock()
65,928✔
302
        o.maxAssigned = x.Max(o.maxAssigned, max)
65,928✔
303
}
304

305
// MaxPending returns the maximum assigned timestamp.
306
func (o *Oracle) MaxPending() uint64 {
513✔
307
        o.RLock()
513✔
308
        defer o.RUnlock()
513✔
309
        return o.maxAssigned
513✔
310
}
513✔
311

312
// proposeTxn proposes a txn update, and then updates src to reflect the state
313
// of the commit after proposal is run.
314
func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
12,305✔
315
        var zp pb.ZeroProposal
12,305✔
316
        zp.Txn = &api.TxnContext{
12,305✔
317
                StartTs:  src.StartTs,
12,305✔
318
                CommitTs: src.CommitTs,
12,305✔
319
                Aborted:  src.Aborted,
12,305✔
320
        }
12,305✔
321

12,305✔
322
        // NOTE: It is important that we continue retrying proposeTxn until we succeed. This should
12,305✔
323
        // happen, irrespective of what the user context timeout might be. We check for it before
12,305✔
324
        // reaching this stage, but now that we're here, we have to ensure that the commit proposal goes
12,305✔
325
        // through. Otherwise, we should block here forever. If we don't do this, we'll see txn
12,305✔
326
        // violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would
12,305✔
327
        // cause newer txns to see older data.
12,305✔
328

12,305✔
329
        // If this node stops being the leader, we want this proposal to not be forwarded to the leader,
12,305✔
330
        // and get aborted.
12,305✔
331
        if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
12,305✔
332
                return err
×
333
        }
×
334

335
        // There might be race between this proposal trying to commit and predicate
336
        // move aborting it. A predicate move, triggered by Zero, would abort all
337
        // pending transactions.  At the same time, a client which has already done
338
        // mutations, can proceed to commit it. A race condition can happen here,
339
        // with both proposing their respective states, only one can succeed after
340
        // the proposal is done. So, check again to see the fate of the transaction
341
        // here.
342
        src.CommitTs = s.orc.commitTs(src.StartTs)
12,305✔
343
        if src.CommitTs == 0 {
14,635✔
344
                src.Aborted = true
2,330✔
345
        }
2,330✔
346
        return nil
12,305✔
347
}
348

349
func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
12,305✔
350
        span := otrace.FromContext(ctx)
12,305✔
351
        span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
12,305✔
352
        if src.Aborted {
12,748✔
353
                return s.proposeTxn(ctx, src)
443✔
354
        }
443✔
355

356
        // Use the start timestamp to check if we have a conflict, before we need to assign a commit ts.
357
        s.orc.RLock()
11,862✔
358
        conflict := s.orc.hasConflict(src)
11,862✔
359
        s.orc.RUnlock()
11,862✔
360
        if conflict {
13,718✔
361
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)},
1,856✔
362
                        "Oracle found conflict")
1,856✔
363
                src.Aborted = true
1,856✔
364
                return s.proposeTxn(ctx, src)
1,856✔
365
        }
1,856✔
366

367
        checkPreds := func() error {
20,012✔
368
                // Check if any of these tablets is being moved. If so, abort the transaction.
10,006✔
369
                for _, pkey := range src.Preds {
30,637✔
370
                        splits := strings.SplitN(pkey, "-", 2)
20,631✔
371
                        if len(splits) < 2 {
20,631✔
372
                                return errors.Errorf("Unable to find group id in %s", pkey)
×
373
                        }
×
374
                        gid, err := strconv.Atoi(splits[0])
20,631✔
375
                        if err != nil {
20,631✔
376
                                return errors.Wrapf(err, "unable to parse group id from %s", pkey)
×
377
                        }
×
378
                        pred := splits[1]
20,631✔
379
                        tablet := s.ServingTablet(pred)
20,631✔
380
                        if tablet == nil {
20,631✔
381
                                return errors.Errorf("Tablet for %s is nil", pred)
×
382
                        }
×
383
                        if tablet.GroupId != uint32(gid) {
20,631✔
384
                                return errors.Errorf("Mutation done in group: %d. Predicate %s assigned to %d",
×
385
                                        gid, pred, tablet.GroupId)
×
386
                        }
×
387
                        if s.isBlocked(pred) {
20,631✔
388
                                return errors.Errorf("Commits on predicate %s are blocked due to predicate move", pred)
×
389
                        }
×
390
                }
391
                return nil
10,006✔
392
        }
393
        if err := checkPreds(); err != nil {
10,006✔
394
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, err.Error())
×
395
                src.Aborted = true
×
396
                return s.proposeTxn(ctx, src)
×
397
        }
×
398

399
        num := pb.Num{Val: 1, Type: pb.Num_TXN_TS}
10,006✔
400
        assigned, err := s.lease(ctx, &num)
10,006✔
401
        if err != nil {
10,006✔
402
                return err
×
403
        }
×
404
        src.CommitTs = assigned.StartId
10,006✔
405
        // Mark the transaction as done, irrespective of whether the proposal succeeded or not.
10,006✔
406
        defer s.orc.doneUntil.Done(src.CommitTs)
10,006✔
407
        span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
10,006✔
408
                "Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)
10,006✔
409

10,006✔
410
        if err := s.orc.commit(src); err != nil {
10,038✔
411
                span.Annotatef(nil, "Found a conflict. Aborting.")
32✔
412
                src.Aborted = true
32✔
413
        }
32✔
414
        if err := ctx.Err(); err != nil {
10,006✔
415
                span.Annotatef(nil, "Aborting txn due to context timing out.")
×
416
                src.Aborted = true
×
417
        }
×
418
        // Propose txn should be used to set watermark as done.
419
        return s.proposeTxn(ctx, src)
10,006✔
420
}
421

422
// CommitOrAbort either commits a transaction or aborts it.
423
// The abortion can happen under the following conditions
424
// 1) the api.TxnContext.Aborted flag is set in the src argument
425
// 2) if there's an error (e.g server is not the leader or there's a conflicting transaction)
426
func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) {
12,305✔
427
        if ctx.Err() != nil {
12,305✔
428
                return nil, ctx.Err()
×
429
        }
×
430
        ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
12,305✔
431
        defer span.End()
12,305✔
432

12,305✔
433
        if !s.Node.AmLeader() {
12,305✔
434
                return nil, errors.Errorf("Only leader can decide to commit or abort")
×
435
        }
×
436
        err := s.commit(ctx, src)
12,305✔
437
        if err != nil {
12,305✔
438
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("error", true)}, err.Error())
×
439
        }
×
440
        return src, err
12,305✔
441
}
442

443
var errClosed = errors.New("Streaming closed by oracle")
444
var errNotLeader = errors.New("Node is no longer leader")
445

446
// Oracle streams the oracle state to the alphas.
447
// The first entry sent by Zero contains the entire state of transactions. Zero periodically
448
// confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a
449
// safe way to get the status of all the transactions.
450
func (s *Server) Oracle(_ *api.Payload, server pb.Zero_OracleServer) error {
83✔
451
        if !s.Node.AmLeader() {
87✔
452
                return errNotLeader
4✔
453
        }
4✔
454
        ch, id := s.orc.newSubscriber()
79✔
455
        defer s.orc.removeSubscriber(id)
79✔
456

79✔
457
        ctx := server.Context()
79✔
458
        leaderChangeCh := s.leaderChangeChannel()
79✔
459
        for {
153,221✔
460
                select {
153,142✔
461
                case <-leaderChangeCh:
×
462
                        return errNotLeader
×
463
                case delta, open := <-ch:
153,063✔
464
                        if !open {
153,063✔
465
                                return errClosed
×
466
                        }
×
467
                        // Pass in the latest group checksum as well, so the Alpha can use that to determine
468
                        // when not to service a read.
469
                        delta.GroupChecksums = s.groupChecksums()
153,063✔
470
                        if err := server.Send(&delta); err != nil {
153,063✔
471
                                return err
×
472
                        }
×
473
                case <-ctx.Done():
36✔
474
                        return ctx.Err()
36✔
475
                case <-s.closer.HasBeenClosed():
43✔
476
                        return errServerShutDown
43✔
477
                }
478
        }
479
}
480

481
// TryAbort attempts to abort the given transactions which are not already committed..
482
func (s *Server) TryAbort(ctx context.Context,
483
        txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
×
484
        delta := &pb.OracleDelta{}
×
485
        for _, startTs := range txns.Ts {
×
486
                // Do via proposals to avoid race
×
487
                tctx := &api.TxnContext{StartTs: startTs, Aborted: true}
×
488
                if err := s.proposeTxn(ctx, tctx); err != nil {
×
489
                        return delta, err
×
490
                }
×
491
                // Txn should be aborted if not already committed.
492
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
×
493
                        StartTs:  startTs,
×
494
                        CommitTs: s.orc.commitTs(startTs)})
×
495
        }
496
        return delta, nil
×
497
}
498

499
// Timestamps is used to assign startTs for a new transaction
500
func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
66,603✔
501
        ctx, span := otrace.StartSpan(ctx, "Zero.Timestamps")
66,603✔
502
        defer span.End()
66,603✔
503

66,603✔
504
        span.Annotatef(nil, "Zero id: %d. Timestamp request: %+v", s.Node.Id, num)
66,603✔
505
        if ctx.Err() != nil {
66,603✔
506
                return &emptyAssignedIds, ctx.Err()
×
507
        }
×
508

509
        num.Type = pb.Num_TXN_TS
66,603✔
510
        reply, err := s.lease(ctx, num)
66,603✔
511
        span.Annotatef(nil, "Response: %+v. Error: %v", reply, err)
66,603✔
512

66,603✔
513
        switch err {
66,603✔
514
        case nil:
65,928✔
515
                s.orc.doneUntil.Done(x.Max(reply.EndId, reply.ReadOnly))
65,928✔
516
                go s.orc.storePending(reply)
65,928✔
517
        case errServedFromMemory:
675✔
518
                // Avoid calling doneUntil.Done, and storePending.
675✔
519
                err = nil
675✔
520
        default:
×
521
                glog.Errorf("Got error: %v while leasing timestamps: %+v", err, num)
×
522
        }
523
        return reply, err
66,603✔
524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc