• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 9433

pending completion
9433

push

travis-ci

locker
box: implement persistent sequences

This patch implements a new object type, persistent sequences. Sequences
are created with function box.schema.sequence.create(name, options).
Options include min/max, start value, increment, cache size, just like
in Postgresql, although 'cache' is ignored for now. All sequences can be
accessed via box.sequence.<name>, similarly to spaces. To generate a
sequence value, use seq:next() method. To retrieve the last generated
value, use seq:get(). A sequence value can also be reset to the start
value or to any other value using seq:reset() and seq:set() methods.

Needed for #389

268 of 268 new or added lines in 9 files covered. (100.0%)

32846 of 37402 relevant lines covered (87.82%)

1100205.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.79
/src/box/vy_tx.c
1
/*
2
 * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "vy_tx.h"
32

33
#include <assert.h>
34
#include <stdbool.h>
35
#include <stddef.h>
36
#include <stdint.h>
37
#include <stdlib.h>
38

39
#include <small/mempool.h>
40
#include <small/rlist.h>
41

42
#include "diag.h"
43
#include "errcode.h"
44
#include "fiber.h"
45
#include "iproto_constants.h"
46
#include "iterator_type.h"
47
#include "salad/stailq.h"
48
#include "schema.h" /* schema_version */
49
#include "trigger.h"
50
#include "trivia/util.h"
51
#include "tuple.h"
52
#include "vy_cache.h"
53
#include "vy_index.h"
54
#include "vy_mem.h"
55
#include "vy_stat.h"
56
#include "vy_stmt.h"
57
#include "vy_stmt_iterator.h"
58
#include "vy_upsert.h"
59
#include "vy_read_set.h"
60

61
int
62
write_set_cmp(struct txv *a, struct txv *b)
609,111✔
63
{
64
        int rc = a->index < b->index ? -1 : a->index > b->index;
609,111✔
65
        if (rc == 0)
609,111✔
66
                return vy_stmt_compare(a->stmt, b->stmt, a->index->cmp_def);
508,107✔
67
        return rc;
101,004✔
68
}
69

70
int
71
write_set_key_cmp(struct write_set_key *a, struct txv *b)
1,512,395✔
72
{
73
        int rc = a->index < b->index ? -1 : a->index > b->index;
1,512,395✔
74
        if (rc == 0)
1,512,395✔
75
                return vy_stmt_compare(a->stmt, b->stmt, a->index->cmp_def);
1,196,283✔
76
        return rc;
316,112✔
77
}
78

79
/**
80
 * Initialize an instance of a global read view.
81
 * To be used exclusively by the transaction manager.
82
 */
83
static void
84
vy_global_read_view_create(struct vy_read_view *rv, int64_t lsn)
920✔
85
{
86
        rlist_create(&rv->in_read_views);
920✔
87
        /*
88
         * By default, the transaction is assumed to be
89
         * read-write, and it reads the latest changes of all
90
         * prepared transactions. This makes it possible to
91
         * use the tuple cache in it.
92
         */
93
        rv->vlsn = lsn;
920✔
94
        rv->refs = 0;
920✔
95
        rv->is_aborted = false;
920✔
96
}
920✔
97

98
struct tx_manager *
99
tx_manager_new(void)
460✔
100
{
101
        struct tx_manager *xm = calloc(1, sizeof(*xm));
460✔
102
        if (xm == NULL) {
460✔
103
                diag_set(OutOfMemory, sizeof(*xm),
×
104
                         "malloc", "struct tx_manager");
105
                return NULL;
×
106
        }
107

108
        rlist_create(&xm->read_views);
460✔
109
        vy_global_read_view_create((struct vy_read_view *)&xm->global_read_view,
460✔
110
                                   INT64_MAX);
111
        xm->p_global_read_view = &xm->global_read_view;
460✔
112
        vy_global_read_view_create((struct vy_read_view *)&xm->committed_read_view,
460✔
113
                                   MAX_LSN - 1);
114
        xm->p_committed_read_view = &xm->committed_read_view;
460✔
115

116
        struct slab_cache *slab_cache = cord_slab_cache();
460✔
117
        mempool_create(&xm->tx_mempool, slab_cache, sizeof(struct vy_tx));
460✔
118
        mempool_create(&xm->txv_mempool, slab_cache, sizeof(struct txv));
460✔
119
        mempool_create(&xm->read_interval_mempool, slab_cache,
460✔
120
                       sizeof(struct vy_read_interval));
121
        mempool_create(&xm->read_view_mempool, slab_cache,
460✔
122
                       sizeof(struct vy_read_view));
123
        return xm;
460✔
124
}
125

126
void
127
tx_manager_delete(struct tx_manager *xm)
458✔
128
{
129
        mempool_destroy(&xm->read_view_mempool);
458✔
130
        mempool_destroy(&xm->read_interval_mempool);
458✔
131
        mempool_destroy(&xm->txv_mempool);
458✔
132
        mempool_destroy(&xm->tx_mempool);
458✔
133
        free(xm);
458✔
134
}
458✔
135

136
/** Create or reuse an instance of a read view. */
137
static struct vy_read_view *
138
tx_manager_read_view(struct tx_manager *xm)
369✔
139
{
140
        struct vy_read_view *rv;
141
        /*
142
         * Check if the last read view can be reused. Reference
143
         * and return it if it's the case.
144
         */
145
        if (!rlist_empty(&xm->read_views)) {
369✔
146
                rv = rlist_last_entry(&xm->read_views, struct vy_read_view,
196✔
147
                                      in_read_views);
148
                /** Reuse an existing read view */
149
                if ((xm->last_prepared_tx == NULL && rv->vlsn == xm->lsn) ||
272✔
150
                    (xm->last_prepared_tx != NULL &&
76✔
151
                     rv->vlsn == MAX_LSN + xm->last_prepared_tx->psn)) {
×
152

153
                        rv->refs++;
120✔
154
                        return  rv;
120✔
155
                }
156
        }
157
        rv = mempool_alloc(&xm->read_view_mempool);
249✔
158
        if (rv == NULL) {
249✔
159
                diag_set(OutOfMemory, sizeof(*rv),
×
160
                         "mempool", "read view");
161
                return NULL;
×
162
        }
163
        rv->is_aborted = false;
249✔
164
        if (xm->last_prepared_tx != NULL) {
249✔
165
                rv->vlsn = MAX_LSN + xm->last_prepared_tx->psn;
1✔
166
                xm->last_prepared_tx->read_view = rv;
1✔
167
                rv->refs = 2;
1✔
168
        } else {
169
                rv->vlsn = xm->lsn;
248✔
170
                rv->refs = 1;
248✔
171
        }
172
        /*
173
         * Add to the tail of the list, so that tx_manager_vlsn()
174
         * works correctly.
175
         */
176
        rlist_add_tail_entry(&xm->read_views, rv, in_read_views);
249✔
177
        return rv;
249✔
178
}
179

180
/** Dereference and possibly destroy a read view. */
181
static void
182
tx_manager_destroy_read_view(struct tx_manager *xm,
685,507✔
183
                             const struct vy_read_view *read_view)
184
{
185
        struct vy_read_view *rv = (struct vy_read_view *) read_view;
685,507✔
186
        if (rv == xm->p_global_read_view)
685,507✔
187
                return;
1,370,646✔
188
        assert(rv->refs);
368✔
189
        if (--rv->refs == 0) {
368✔
190
                rlist_del_entry(rv, in_read_views);
247✔
191
                mempool_free(&xm->read_view_mempool, rv);
247✔
192
        }
193
}
194

195
int64_t
196
tx_manager_vlsn(struct tx_manager *xm)
×
197
{
198
        if (rlist_empty(&xm->read_views))
×
199
                return xm->lsn;
×
200
        struct vy_read_view *oldest = rlist_first_entry(&xm->read_views,
×
201
                                                        struct vy_read_view,
202
                                                        in_read_views);
203
        return oldest->vlsn;
×
204
}
205

206
static struct txv *
207
txv_new(struct vy_tx *tx, struct vy_index *index, struct tuple *stmt)
621,291✔
208
{
209
        struct txv *v = mempool_alloc(&tx->xm->txv_mempool);
621,291✔
210
        if (v == NULL) {
621,291✔
211
                diag_set(OutOfMemory, sizeof(*v), "mempool", "struct txv");
×
212
                return NULL;
×
213
        }
214
        v->index = index;
621,291✔
215
        vy_index_ref(v->index);
621,291✔
216
        v->mem = NULL;
621,291✔
217
        v->stmt = stmt;
621,291✔
218
        tuple_ref(stmt);
621,291✔
219
        v->region_stmt = NULL;
621,291✔
220
        v->tx = tx;
621,291✔
221
        v->is_overwritten = false;
621,291✔
222
        v->overwritten = NULL;
621,291✔
223
        return v;
621,291✔
224
}
225

226
static void
227
txv_delete(struct txv *v)
620,951✔
228
{
229
        tuple_unref(v->stmt);
620,951✔
230
        vy_index_unref(v->index);
620,951✔
231
        mempool_free(&v->tx->xm->txv_mempool, v);
620,951✔
232
}
620,951✔
233

234
static struct vy_read_interval *
235
vy_read_interval_new(struct vy_tx *tx, struct vy_index *index,
2,135,162✔
236
                     struct tuple *left, bool left_belongs,
237
                     struct tuple *right, bool right_belongs)
238
{
239
        struct vy_read_interval *interval;
240
        interval = mempool_alloc(&tx->xm->read_interval_mempool);
2,135,162✔
241
        if (interval == NULL) {
2,135,162✔
242
                diag_set(OutOfMemory, sizeof(*interval),
×
243
                         "mempool", "struct vy_read_interval");
244
                return NULL;
×
245
        }
246
        interval->tx = tx;
2,135,162✔
247
        vy_index_ref(index);
2,135,162✔
248
        interval->index = index;
2,135,162✔
249
        tuple_ref(left);
2,135,162✔
250
        interval->left = left;
2,135,162✔
251
        interval->left_belongs = left_belongs;
2,135,162✔
252
        tuple_ref(right);
2,135,162✔
253
        interval->right = right;
2,135,162✔
254
        interval->right_belongs = right_belongs;
2,135,162✔
255
        interval->subtree_last = NULL;
2,135,162✔
256
        return interval;
2,135,162✔
257
}
258

259
static void
260
vy_read_interval_delete(struct vy_read_interval *interval)
2,135,155✔
261
{
262
        vy_index_unref(interval->index);
2,135,155✔
263
        tuple_unref(interval->left);
2,135,155✔
264
        tuple_unref(interval->right);
2,135,155✔
265
        mempool_free(&interval->tx->xm->read_interval_mempool, interval);
2,135,155✔
266
}
2,135,155✔
267

268
static struct vy_read_interval *
269
vy_tx_read_set_free_cb(vy_tx_read_set_t *read_set,
399,507✔
270
                       struct vy_read_interval *interval, void *arg)
271
{
272
        (void)arg;
273
        (void)read_set;
274
        vy_index_read_set_remove(&interval->index->read_set, interval);
399,507✔
275
        vy_read_interval_delete(interval);
399,507✔
276
        return NULL;
399,507✔
277
}
278

279
void
280
vy_tx_create(struct tx_manager *xm, struct vy_tx *tx)
685,512✔
281
{
282
        stailq_create(&tx->log);
685,512✔
283
        write_set_new(&tx->write_set);
685,512✔
284
        tx->write_set_version = 0;
685,512✔
285
        tx->write_size = 0;
685,512✔
286
        tx->xm = xm;
685,512✔
287
        tx->state = VINYL_TX_READY;
685,512✔
288
        tx->read_view = (struct vy_read_view *)xm->p_global_read_view;
685,512✔
289
        vy_tx_read_set_new(&tx->read_set);
685,512✔
290
        tx->psn = 0;
685,512✔
291
        rlist_create(&tx->on_destroy);
685,512✔
292
        xm->stat.active++;
685,512✔
293
}
685,512✔
294

295
void
296
vy_tx_destroy(struct vy_tx *tx)
685,507✔
297
{
298
        trigger_run(&tx->on_destroy, NULL);
685,507✔
299
        trigger_destroy(&tx->on_destroy);
685,507✔
300

301
        tx_manager_destroy_read_view(tx->xm, tx->read_view);
685,507✔
302

303
        struct txv *v, *tmp;
304
        stailq_foreach_entry_safe(v, tmp, &tx->log, next_in_log) {
1,292,853✔
305
                vy_stmt_counter_unacct_tuple(&v->index->stat.txw.count,
607,346✔
306
                                             v->stmt);
607,346✔
307
                txv_delete(v);
607,346✔
308
        }
309

310
        vy_tx_read_set_iter(&tx->read_set, NULL, vy_tx_read_set_free_cb, NULL);
685,507✔
311

312
        tx->xm->stat.active--;
685,507✔
313
}
685,507✔
314

315
/** Return true if the transaction is read-only. */
316
static bool
317
vy_tx_is_ro(struct vy_tx *tx)
1,140,059✔
318
{
319
        return write_set_empty(&tx->write_set);
1,140,059✔
320
}
321

322
/** Return true if the transaction is in read view. */
323
static bool
324
vy_tx_is_in_read_view(struct vy_tx *tx)
3,078,501✔
325
{
326
        return tx->read_view->vlsn != INT64_MAX;
3,078,501✔
327
}
328

329
/**
330
 * Send to read view all transactions that are reading key @v
331
 * modified by transaction @tx.
332
 */
333
static int
334
vy_tx_send_to_read_view(struct vy_tx *tx, struct txv *v)
604,635✔
335
{
336
        struct vy_tx_conflict_iterator it;
337
        vy_tx_conflict_iterator_init(&it, &v->index->read_set, v->stmt);
604,635✔
338
        struct vy_tx *abort;
339
        while ((abort = vy_tx_conflict_iterator_next(&it)) != NULL) {
1,463,828✔
340
                /* Don't abort self. */
341
                if (abort == tx)
254,558✔
342
                        continue;
253,786✔
343
                /* Abort only active TXs */
344
                if (abort->state != VINYL_TX_READY)
772✔
345
                        continue;
1✔
346
                /* already in (earlier) read view */
347
                if (vy_tx_is_in_read_view(abort))
771✔
348
                        continue;
402✔
349
                struct vy_read_view *rv = tx_manager_read_view(tx->xm);
369✔
350
                if (rv == NULL)
369✔
351
                        return -1;
604,635✔
352
                abort->read_view = rv;
369✔
353
        }
354
        return 0;
604,635✔
355
}
356

357
/**
358
 * Abort all transaction that are reading key @v modified
359
 * by transaction @tx.
360
 */
361
static void
362
vy_tx_abort_readers(struct vy_tx *tx, struct txv *v)
150✔
363
{
364
        struct vy_tx_conflict_iterator it;
365
        vy_tx_conflict_iterator_init(&it, &v->index->read_set, v->stmt);
150✔
366
        struct vy_tx *abort;
367
        while ((abort = vy_tx_conflict_iterator_next(&it)) != NULL) {
436✔
368
                /* Don't abort self. */
369
                if (abort == tx)
136✔
370
                        continue;
36✔
371
                /* Abort only active TXs */
372
                if (abort->state != VINYL_TX_READY)
100✔
373
                        continue;
×
374
                abort->state = VINYL_TX_ABORT;
100✔
375
        }
376
}
150✔
377

378
struct vy_tx *
379
vy_tx_begin(struct tx_manager *xm)
570,434✔
380
{
381
        struct vy_tx *tx = mempool_alloc(&xm->tx_mempool);
570,434✔
382
        if (unlikely(tx == NULL)) {
570,434✔
383
                diag_set(OutOfMemory, sizeof(*tx), "mempool", "struct vy_tx");
×
384
                return NULL;
×
385
        }
386
        vy_tx_create(xm, tx);
570,434✔
387
        return tx;
570,434✔
388
}
389

390
/**
391
 * Rotate the active in-memory tree if necessary and pin it to make
392
 * sure it is not dumped until the transaction is complete.
393
 */
394
static int
395
vy_tx_write_prepare(struct txv *v)
604,623✔
396
{
397
        struct vy_index *index = v->index;
604,623✔
398

399
        /*
400
         * Allocate a new in-memory tree if either of the following
401
         * conditions is true:
402
         *
403
         * - Generation has increased after the tree was created.
404
         *   In this case we need to dump the tree as is in order to
405
         *   guarantee dump consistency.
406
         *
407
         * - Schema version has increased after the tree was created.
408
         *   We have to seal the tree, because we don't support mixing
409
         *   statements of different formats in the same tree.
410
         */
411
        if (unlikely(index->mem->schema_version != schema_version ||
604,623✔
412
                     index->mem->generation != *index->env->p_generation)) {
413
                if (vy_index_rotate_mem(index) != 0)
1,111✔
414
                        return -1;
×
415
        }
416
        vy_mem_pin(index->mem);
604,623✔
417
        v->mem = index->mem;
604,623✔
418
        return 0;
604,623✔
419
}
420

421
/**
422
 * Write a single statement into an index. If the statement has
423
 * an lsregion copy then use it, else create it.
424
 *
425
 * @param index       Index to write to.
426
 * @param mem         In-memory tree to write to.
427
 * @param stmt        Statement allocated with malloc().
428
 * @param region_stmt NULL or the same statement as stmt,
429
 *                    but allocated on lsregion.
430
 *
431
 * @retval  0 Success.
432
 * @retval -1 Memory error.
433
 */
434
static int
435
vy_tx_write(struct vy_index *index, struct vy_mem *mem,
604,623✔
436
            struct tuple *stmt, const struct tuple **region_stmt)
437
{
438
        assert(vy_stmt_is_refable(stmt));
604,623✔
439
        assert(*region_stmt == NULL || !vy_stmt_is_refable(*region_stmt));
604,623✔
440

441
        /*
442
         * The UPSERT statement can be applied to the cached
443
         * statement, because the cache always contains only
444
         * newest REPLACE statements. In such a case the UPSERT,
445
         * applied to the cached statement, can be inserted
446
         * instead of the original UPSERT.
447
         */
448
        if (vy_stmt_type(stmt) == IPROTO_UPSERT) {
604,623✔
449
                struct tuple *deleted = NULL;
36,669✔
450
                /* Invalidate cache element. */
451
                vy_cache_on_write(&index->cache, stmt, &deleted);
36,669✔
452
                if (deleted != NULL) {
36,669✔
453
                        struct tuple *applied =
737✔
454
                                vy_apply_upsert(stmt, deleted, mem->cmp_def,
737✔
455
                                                mem->format, mem->upsert_format,
456
                                                false);
457
                        tuple_unref(deleted);
737✔
458
                        if (applied != NULL) {
737✔
459
                                assert(vy_stmt_type(applied) == IPROTO_REPLACE);
737✔
460
                                int rc = vy_index_set(index, mem, applied,
737✔
461
                                                      region_stmt);
462
                                tuple_unref(applied);
737✔
463
                                return rc;
737✔
464
                        }
465
                        /*
466
                         * Ignore a memory error, because it is
467
                         * not critical to apply the optimization.
468
                         */
469
                }
470
        } else {
471
                /* Invalidate cache element. */
472
                vy_cache_on_write(&index->cache, stmt, NULL);
567,954✔
473
        }
474
        return vy_index_set(index, mem, stmt, region_stmt);
603,886✔
475
}
476

477
int
478
vy_tx_prepare(struct vy_tx *tx)
570,190✔
479
{
480
        struct tx_manager *xm = tx->xm;
570,190✔
481

482
        if (vy_tx_is_ro(tx)) {
570,190✔
483
                assert(tx->state == VINYL_TX_READY);
39,913✔
484
                tx->state = VINYL_TX_COMMIT;
39,913✔
485
                return 0;
610,103✔
486
        }
487

488
        if (vy_tx_is_in_read_view(tx) || tx->state == VINYL_TX_ABORT) {
530,277✔
489
                xm->stat.conflict++;
202✔
490
                diag_set(ClientError, ER_TRANSACTION_CONFLICT);
202✔
491
                return -1;
202✔
492
        }
493

494
        assert(tx->state == VINYL_TX_READY);
530,075✔
495
        tx->state = VINYL_TX_COMMIT;
530,075✔
496

497
        assert(tx->read_view == &xm->global_read_view);
530,075✔
498
        tx->psn = ++xm->psn;
530,075✔
499

500
        /** Send to read view read/write intersection. */
501
        struct txv *v;
502
        struct write_set_iterator it;
503
        write_set_ifirst(&tx->write_set, &it);
530,075✔
504
        while ((v = write_set_inext(&it)) != NULL) {
1,664,785✔
505
                if (vy_tx_send_to_read_view(tx, v))
604,635✔
506
                        return -1;
×
507
        }
508

509
        /*
510
         * Flush transactional changes to the index.
511
         * Sic: the loop below must not yield after recovery.
512
         */
513
        /* repsert - REPLACE/UPSERT */
514
        const struct tuple *delete = NULL, *repsert = NULL;
530,075✔
515
        MAYBE_UNUSED uint32_t current_space_id = 0;
530,075✔
516
        stailq_foreach_entry(v, &tx->log, next_in_log) {
1,135,746✔
517
                struct vy_index *index = v->index;
605,673✔
518
                if (index->id == 0) {
605,673✔
519
                        /* The beginning of the new txn_stmt is met. */
520
                        current_space_id = index->space_id;
554,492✔
521
                        repsert = NULL;
554,492✔
522
                        delete = NULL;
554,492✔
523
                }
524
                assert(index->space_id == current_space_id);
605,673✔
525

526
                /* Do not save statements that was overwritten by the same tx */
527
                if (v->is_overwritten)
605,673✔
528
                        continue;
1,050✔
529

530
                if (vy_tx_write_prepare(v) != 0)
604,623✔
531
                        return -1;
×
532
                assert(v->mem != NULL);
604,623✔
533

534
                /* In secondary indexes only REPLACE/DELETE can be written. */
535
                vy_stmt_set_lsn(v->stmt, MAX_LSN + tx->psn);
604,623✔
536
                enum iproto_type type = vy_stmt_type(v->stmt);
604,623✔
537
                const struct tuple **region_stmt =
604,623✔
538
                        (type == IPROTO_DELETE) ? &delete : &repsert;
604,623✔
539
                if (vy_tx_write(index, v->mem, v->stmt, region_stmt) != 0)
604,623✔
540
                        return -1;
2✔
541
                v->region_stmt = *region_stmt;
604,621✔
542
        }
543
        xm->last_prepared_tx = tx;
530,073✔
544
        return 0;
530,073✔
545
}
546

547
void
548
vy_tx_commit(struct vy_tx *tx, int64_t lsn)
569,869✔
549
{
550
        assert(tx->state == VINYL_TX_COMMIT);
569,869✔
551
        struct tx_manager *xm = tx->xm;
569,869✔
552

553
        xm->stat.commit++;
569,869✔
554

555
        if (xm->last_prepared_tx == tx)
569,869✔
556
                xm->last_prepared_tx = NULL;
510,259✔
557

558
        if (vy_tx_is_ro(tx))
569,869✔
559
                goto out;
39,913✔
560

561
        assert(xm->lsn < lsn);
529,956✔
562
        xm->lsn = lsn;
529,956✔
563

564
        /* Fix LSNs of the records and commit changes. */
565
        struct txv *v;
566
        stailq_foreach_entry(v, &tx->log, next_in_log) {
1,135,491✔
567
                if (v->region_stmt != NULL) {
605,535✔
568
                        vy_stmt_set_lsn((struct tuple *)v->region_stmt, lsn);
604,485✔
569
                        vy_index_commit_stmt(v->index, v->mem, v->region_stmt);
604,485✔
570
                }
571
                if (v->mem != NULL)
605,535✔
572
                        vy_mem_unpin(v->mem);
604,485✔
573
        }
574

575
        /* Update read views of dependant transactions. */
576
        if (tx->read_view != &xm->global_read_view)
529,956✔
577
                tx->read_view->vlsn = lsn;
1✔
578
out:
579
        vy_tx_destroy(tx);
569,869✔
580
        mempool_free(&xm->tx_mempool, tx);
569,869✔
581
}
569,869✔
582

583
static void
584
vy_tx_rollback_after_prepare(struct vy_tx *tx)
119✔
585
{
586
        assert(tx->state == VINYL_TX_COMMIT);
119✔
587

588
        struct tx_manager *xm = tx->xm;
119✔
589

590
        /*
591
         * There are two reasons of rollback_after_prepare:
592
         * 1) Fail in the middle of vy_tx_prepare call.
593
         * 2) Cascading rollback after WAL fail.
594
         *
595
         * If a TX is the latest prepared TX and the it is rollbacked,
596
         * it's certainly the case (2) and we should set xm->last_prepared_tx
597
         * to the previous prepared TX, if any.
598
         * But doesn't know the previous TX.
599
         * On the other hand we may expect that cascading rollback will
600
         * concern all the prepared TXs, all of them will be rollbacked
601
         * and xm->last_prepared_tx must be set to NULL in the end.
602
         * Thus we can set xm->last_prepared_tx to NULL now and it will be
603
         * correct in the end of the cascading rollback.
604
         *
605
         * We must not change xm->last_prepared_tx in all other cases,
606
         * it will be changed by the corresponding TX.
607
         */
608
        if (xm->last_prepared_tx == tx)
119✔
609
                xm->last_prepared_tx = NULL;
108✔
610

611
        struct txv *v;
612
        stailq_foreach_entry(v, &tx->log, next_in_log) {
269✔
613
                if (v->region_stmt != NULL)
150✔
614
                        vy_index_rollback_stmt(v->index, v->mem,
136✔
615
                                               v->region_stmt);
616
                if (v->mem != NULL)
150✔
617
                        vy_mem_unpin(v->mem);
138✔
618
        }
619

620
        /* Abort read views of dependent transactions. */
621
        if (tx->read_view != &xm->global_read_view)
119✔
622
                tx->read_view->is_aborted = true;
×
623

624
        struct write_set_iterator it;
625
        write_set_ifirst(&tx->write_set, &it);
119✔
626
        while ((v = write_set_inext(&it)) != NULL) {
388✔
627
                vy_tx_abort_readers(tx, v);
150✔
628
        }
629
}
119✔
630

631
void
632
vy_tx_rollback(struct vy_tx *tx)
564✔
633
{
634
        struct tx_manager *xm = tx->xm;
564✔
635

636
        xm->stat.rollback++;
564✔
637

638
        if (tx->state == VINYL_TX_COMMIT)
564✔
639
                vy_tx_rollback_after_prepare(tx);
119✔
640

641
        vy_tx_destroy(tx);
564✔
642
        mempool_free(&xm->tx_mempool, tx);
564✔
643
}
564✔
644

645
void
646
vy_tx_rollback_to_savepoint(struct vy_tx *tx, void *svp)
7,592✔
647
{
648
        assert(tx->state == VINYL_TX_READY);
7,592✔
649
        struct stailq_entry *last = svp;
7,592✔
650
        /* Start from the first statement after the savepoint. */
651
        last = last == NULL ? stailq_first(&tx->log) : stailq_next(last);
7,592✔
652
        if (last == NULL) {
7,592✔
653
                /* Empty transaction or no changes after the savepoint. */
654
                return;
7,611✔
655
        }
656
        struct stailq tail;
657
        stailq_create(&tail);
7,573✔
658
        stailq_splice(&tx->log, last, &tail);
7,573✔
659
        /* Rollback statements in LIFO order. */
660
        stailq_reverse(&tail);
7,573✔
661
        struct txv *v, *tmp;
662
        stailq_foreach_entry_safe(v, tmp, &tail, next_in_log) {
21,178✔
663
                write_set_remove(&tx->write_set, v);
13,605✔
664
                if (v->overwritten != NULL) {
13,605✔
665
                        /* Restore overwritten statement. */
666
                        write_set_insert(&tx->write_set, v->overwritten);
150✔
667
                        v->overwritten->is_overwritten = false;
150✔
668
                }
669
                tx->write_set_version++;
13,605✔
670
                txv_delete(v);
13,605✔
671
        }
672
}
673

674
int
675
vy_tx_track(struct vy_tx *tx, struct vy_index *index,
2,135,201✔
676
            struct tuple *left, bool left_belongs,
677
            struct tuple *right, bool right_belongs)
678
{
679
        if (vy_tx_is_in_read_view(tx)) {
2,135,201✔
680
                /* No point in tracking reads. */
681
                return 0;
2,135,240✔
682
        }
683

684
        struct vy_read_interval *new_interval;
685
        new_interval = vy_read_interval_new(tx, index, left, left_belongs,
2,135,162✔
686
                                            right, right_belongs);
687
        if (new_interval == NULL)
2,135,162✔
688
                return -1;
×
689

690
        /*
691
         * Search for intersections in the transaction read set.
692
         */
693
        struct stailq merge;
694
        stailq_create(&merge);
2,135,162✔
695

696
        struct vy_tx_read_set_iterator it;
697
        vy_tx_read_set_isearch_le(&tx->read_set, new_interval, &it);
2,135,162✔
698

699
        struct vy_read_interval *interval;
700
        interval = vy_tx_read_set_inext(&it);
2,135,162✔
701
        if (interval != NULL && interval->index == index) {
2,135,162✔
702
                if (vy_read_interval_cmpr(interval, new_interval) >= 0) {
980,299✔
703
                        /*
704
                         * There is an interval in the tree spanning
705
                         * the new interval. Nothing to do.
706
                         */
707
                        vy_read_interval_delete(new_interval);
32,557✔
708
                        return 0;
32,557✔
709
                }
710
                if (vy_read_interval_should_merge(interval, new_interval))
947,742✔
711
                        stailq_add_tail_entry(&merge, interval, in_merge);
900,756✔
712
        }
713

714
        if (interval == NULL)
2,102,605✔
715
                vy_tx_read_set_isearch_gt(&tx->read_set, new_interval, &it);
1,152,814✔
716

717
        while ((interval = vy_tx_read_set_inext(&it)) != NULL &&
5,860,477✔
718
               interval->index == index &&
1,702,115✔
719
               vy_read_interval_should_merge(new_interval, interval))
849,183✔
720
                stailq_add_tail_entry(&merge, interval, in_merge);
802,335✔
721

722
        /*
723
         * Merge intersecting intervals with the new interval and
724
         * remove them from the transaction and index read sets.
725
         */
726
        if (!stailq_empty(&merge)) {
2,102,605✔
727
                interval = stailq_first_entry(&merge, struct vy_read_interval,
1,702,948✔
728
                                              in_merge);
729
                if (vy_read_interval_cmpl(new_interval, interval) > 0) {
1,702,948✔
730
                        tuple_ref(interval->left);
37✔
731
                        tuple_unref(new_interval->left);
37✔
732
                        new_interval->left = interval->left;
37✔
733
                        new_interval->left_belongs = interval->left_belongs;
37✔
734
                }
735
                interval = stailq_last_entry(&merge, struct vy_read_interval,
1,702,948✔
736
                                             in_merge);
737
                if (vy_read_interval_cmpr(new_interval, interval) < 0) {
1,702,948✔
738
                        tuple_ref(interval->right);
35✔
739
                        tuple_unref(new_interval->right);
35✔
740
                        new_interval->right = interval->right;
35✔
741
                        new_interval->right_belongs = interval->right_belongs;
35✔
742
                }
743
                struct vy_read_interval *next_interval;
744
                stailq_foreach_entry_safe(interval, next_interval, &merge,
3,406,039✔
745
                                          in_merge) {
746
                        vy_tx_read_set_remove(&tx->read_set, interval);
1,703,091✔
747
                        vy_index_read_set_remove(&index->read_set, interval);
1,703,091✔
748
                        vy_read_interval_delete(interval);
1,703,091✔
749
                }
750
        }
751

752
        vy_tx_read_set_insert(&tx->read_set, new_interval);
2,102,605✔
753
        vy_index_read_set_insert(&index->read_set, new_interval);
2,102,605✔
754
        return 0;
2,102,605✔
755
}
756

757
int
758
vy_tx_track_point(struct vy_tx *tx, struct vy_index *index,
412,252✔
759
                  struct tuple *stmt)
760
{
761
        assert(tuple_field_count(stmt) >= index->cmp_def->part_count);
412,252✔
762

763
        if (vy_tx_is_in_read_view(tx)) {
412,252✔
764
                /* No point in tracking reads. */
765
                return 0;
496✔
766
        }
767

768
        struct txv *v = write_set_search_key(&tx->write_set, index, stmt);
411,756✔
769
        if (v != NULL && (vy_stmt_type(v->stmt) == IPROTO_REPLACE ||
411,884✔
770
                          vy_stmt_type(v->stmt) == IPROTO_DELETE)) {
128✔
771
                /* Reading from own write set is serializable. */
772
                return 0;
931✔
773
        }
774

775
        return vy_tx_track(tx, index, stmt, true, stmt, true);
410,825✔
776
}
777

778
int
779
vy_tx_set(struct vy_tx *tx, struct vy_index *index, struct tuple *stmt)
621,291✔
780
{
781
        assert(vy_stmt_type(stmt) != 0);
621,291✔
782
        /**
783
         * A statement in write set must have and unique lsn
784
         * in order to differ it from cachable statements in mem and run.
785
         */
786
        vy_stmt_set_lsn(stmt, INT64_MAX);
621,291✔
787
        struct tuple *applied = NULL;
621,291✔
788

789
        /* Update concurrent index */
790
        struct txv *old = write_set_search_key(&tx->write_set, index, stmt);
621,291✔
791
        /* Found a match of the previous action of this transaction */
792
        if (old != NULL && vy_stmt_type(stmt) == IPROTO_UPSERT) {
621,291✔
793
                assert(index->id == 0);
508✔
794
                uint8_t old_type = vy_stmt_type(old->stmt);
508✔
795
                assert(old_type == IPROTO_UPSERT ||
508✔
796
                       old_type == IPROTO_REPLACE ||
797
                       old_type == IPROTO_DELETE);
798
                (void) old_type;
799

800
                applied = vy_apply_upsert(stmt, old->stmt, index->cmp_def,
508✔
801
                                          index->mem_format,
802
                                          index->upsert_format, true);
803
                index->stat.upsert.applied++;
508✔
804
                if (applied == NULL)
508✔
805
                        return -1;
×
806
                stmt = applied;
508✔
807
                assert(vy_stmt_type(stmt) != 0);
508✔
808
                index->stat.upsert.squashed++;
508✔
809
        }
810

811
        /* Allocate a MVCC container. */
812
        struct txv *v = txv_new(tx, index, stmt);
621,291✔
813
        if (applied != NULL)
621,291✔
814
                tuple_unref(applied);
508✔
815
        if (v == NULL)
621,291✔
816
                return -1;
×
817

818
        if (old != NULL) {
621,291✔
819
                /* Leave the old txv in TX log but remove it from write set */
820
                assert(tx->write_size >= tuple_size(old->stmt));
1,837✔
821
                tx->write_count--;
1,837✔
822
                tx->write_size -= tuple_size(old->stmt);
1,837✔
823
                write_set_remove(&tx->write_set, old);
1,837✔
824
                old->is_overwritten = true;
1,837✔
825
        }
826

827
        if (old != NULL && vy_stmt_type(stmt) != IPROTO_UPSERT) {
621,291✔
828
                /*
829
                 * Inherit the column mask of the overwritten statement
830
                 * so as not to skip both statements on dump.
831
                 */
832
                uint64_t column_mask = vy_stmt_column_mask(stmt);
1,759✔
833
                if (column_mask != UINT64_MAX)
1,759✔
834
                        vy_stmt_set_column_mask(stmt, column_mask |
105✔
835
                                                vy_stmt_column_mask(old->stmt));
105✔
836
        }
837

838
        v->overwritten = old;
621,291✔
839
        write_set_insert(&tx->write_set, v);
621,291✔
840
        tx->write_set_version++;
621,291✔
841
        tx->write_count++;
621,291✔
842
        tx->write_size += tuple_size(stmt);
621,291✔
843
        vy_stmt_counter_acct_tuple(&index->stat.txw.count, stmt);
621,291✔
844
        stailq_add_tail_entry(&tx->log, v, next_in_log);
621,291✔
845
        return 0;
621,291✔
846
}
847

848
static struct vy_stmt_iterator_iface vy_txw_iterator_iface;
849

850
void
851
vy_txw_iterator_open(struct vy_txw_iterator *itr,
41,021✔
852
                     struct vy_txw_iterator_stat *stat,
853
                     struct vy_tx *tx, struct vy_index *index,
854
                     enum iterator_type iterator_type,
855
                     const struct tuple *key)
856
{
857
        itr->base.iface = &vy_txw_iterator_iface;
41,021✔
858
        itr->stat = stat;
41,021✔
859
        itr->tx = tx;
41,021✔
860
        itr->index = index;
41,021✔
861
        itr->iterator_type = iterator_type;
41,021✔
862
        itr->key = key;
41,021✔
863
        itr->version = UINT32_MAX;
41,021✔
864
        itr->curr_txv = NULL;
41,021✔
865
        itr->search_started = false;
41,021✔
866
}
41,021✔
867

868
static void
869
vy_txw_iterator_get(struct vy_txw_iterator *itr, struct tuple **ret)
1,056✔
870
{
871
        *ret = itr->curr_txv->stmt;
1,056✔
872
        vy_stmt_counter_acct_tuple(&itr->stat->get, *ret);
1,056✔
873
}
1,056✔
874

875
/**
876
 * Position the iterator to the first entry in the transaction
877
 * write set satisfying the search criteria for a given key and
878
 * direction.
879
 */
880
static void
881
vy_txw_iterator_seek(struct vy_txw_iterator *itr,
41,022✔
882
                     enum iterator_type iterator_type,
883
                     const struct tuple *key)
884
{
885
        itr->stat->lookup++;
41,022✔
886
        itr->version = itr->tx->write_set_version;
41,022✔
887
        itr->curr_txv = NULL;
41,022✔
888
        struct vy_index *index = itr->index;
41,022✔
889
        struct write_set_key k = { index, key };
41,022✔
890
        struct txv *txv;
891
        if (tuple_field_count(key) > 0) {
41,022✔
892
                if (iterator_type == ITER_EQ)
37,806✔
893
                        txv = write_set_search(&itr->tx->write_set, &k);
20,874✔
894
                else if (iterator_type == ITER_GE || iterator_type == ITER_GT)
16,932✔
895
                        txv = write_set_nsearch(&itr->tx->write_set, &k);
8,568✔
896
                else
897
                        txv = write_set_psearch(&itr->tx->write_set, &k);
8,364✔
898
                if (txv == NULL || txv->index != index)
37,806✔
899
                        return;
77,571✔
900
                if (vy_stmt_compare(key, txv->stmt, index->cmp_def) == 0) {
605✔
901
                        while (true) {
902
                                struct txv *next;
903
                                if (iterator_type == ITER_LE ||
510✔
904
                                    iterator_type == ITER_GT)
905
                                        next = write_set_next(&itr->tx->write_set, txv);
23✔
906
                                else
907
                                        next = write_set_prev(&itr->tx->write_set, txv);
487✔
908
                                if (next == NULL || next->index != index)
510✔
909
                                        break;
910
                                if (vy_stmt_compare(key, next->stmt,
401✔
911
                                                    index->cmp_def) != 0)
401✔
912
                                        break;
401✔
913
                                txv = next;
×
914
                        }
×
915
                        if (iterator_type == ITER_GT)
510✔
916
                                txv = write_set_next(&itr->tx->write_set, txv);
15✔
917
                        else if (iterator_type == ITER_LT)
495✔
918
                                txv = write_set_prev(&itr->tx->write_set, txv);
18✔
919
                }
920
        } else if (iterator_type == ITER_LE) {
3,216✔
921
                txv = write_set_nsearch(&itr->tx->write_set, &k);
548✔
922
        } else {
923
                assert(iterator_type == ITER_GE);
2,668✔
924
                txv = write_set_psearch(&itr->tx->write_set, &k);
2,668✔
925
        }
926
        if (txv == NULL || txv->index != index)
3,821✔
927
                return;
3,169✔
928
        itr->curr_txv = txv;
652✔
929
}
930

931
/**
932
 * Advance an iterator to the next statement.
933
 * Always returns 0. On EOF, *ret is set to NULL.
934
 */
935
static NODISCARD int
936
vy_txw_iterator_next_key(struct vy_stmt_iterator *vitr, struct tuple **ret,
41,614✔
937
                         bool *stop)
938
{
939
        (void)stop;
940
        assert(vitr->iface->next_key == vy_txw_iterator_next_key);
41,614✔
941
        struct vy_txw_iterator *itr = (struct vy_txw_iterator *) vitr;
41,614✔
942
        *ret = NULL;
41,614✔
943

944
        if (!itr->search_started) {
41,614✔
945
                itr->search_started = true;
41,021✔
946
                vy_txw_iterator_seek(itr, itr->iterator_type, itr->key);
41,021✔
947
                goto out;
41,021✔
948
        }
949
        itr->version = itr->tx->write_set_version;
593✔
950
        if (itr->curr_txv == NULL)
593✔
951
                return 0;
×
952
        if (itr->iterator_type == ITER_LE || itr->iterator_type == ITER_LT)
593✔
953
                itr->curr_txv = write_set_prev(&itr->tx->write_set, itr->curr_txv);
34✔
954
        else
955
                itr->curr_txv = write_set_next(&itr->tx->write_set, itr->curr_txv);
559✔
956
        if (itr->curr_txv != NULL && itr->curr_txv->index != itr->index)
593✔
957
                itr->curr_txv = NULL;
19✔
958
        if (itr->curr_txv != NULL && itr->iterator_type == ITER_EQ &&
681✔
959
            vy_stmt_compare(itr->key, itr->curr_txv->stmt,
88✔
960
                            itr->index->cmp_def) != 0)
88✔
961
                itr->curr_txv = NULL;
86✔
962
out:
963
        if (itr->curr_txv != NULL)
41,614✔
964
                vy_txw_iterator_get(itr, ret);
1,055✔
965
        return 0;
41,614✔
966
}
967

968
/**
969
 * This function does nothing. It is only needed to conform
970
 * to the common iterator interface.
971
 */
972
static NODISCARD int
973
vy_txw_iterator_next_lsn(struct vy_stmt_iterator *vitr, struct tuple **ret)
1✔
974
{
975
        assert(vitr->iface->next_lsn == vy_txw_iterator_next_lsn);
1✔
976
        (void)vitr;
977
        *ret = NULL;
1✔
978
        return 0;
1✔
979
}
980

981
/**
982
 * Restore the iterator position after a change in the write set.
983
 * Iterator is positioned to the statement following @last_stmt.
984
 * Returns 1 if the iterator position changed, 0 otherwise.
985
 */
986
static NODISCARD int
987
vy_txw_iterator_restore(struct vy_stmt_iterator *vitr,
11,330,169✔
988
                        const struct tuple *last_stmt,
989
                        struct tuple **ret, bool *stop)
990
{
991
        (void)stop;
992

993
        assert(vitr->iface->restore == vy_txw_iterator_restore);
11,330,169✔
994
        struct vy_txw_iterator *itr = (struct vy_txw_iterator *) vitr;
11,330,169✔
995

996
        assert(itr->search_started);
11,330,169✔
997
        if (itr->version == itr->tx->write_set_version)
11,330,169✔
998
                return 0;
11,330,168✔
999

1000
        const struct tuple *key = itr->key;
1✔
1001
        enum iterator_type iterator_type = itr->iterator_type;
1✔
1002
        if (last_stmt != NULL) {
1✔
1003
                key = last_stmt;
1✔
1004
                iterator_type = iterator_direction(iterator_type) > 0 ?
1✔
1005
                                ITER_GT : ITER_LT;
1006
        }
1007

1008
        struct txv *prev_txv = itr->curr_txv;
1✔
1009
        vy_txw_iterator_seek(itr, iterator_type, key);
1✔
1010

1011
        if (itr->iterator_type == ITER_EQ && itr->curr_txv != NULL &&
1✔
1012
            vy_stmt_compare(itr->key, itr->curr_txv->stmt,
×
1013
                            itr->index->cmp_def) != 0)
×
1014
                itr->curr_txv = NULL;
×
1015

1016
        if (prev_txv == itr->curr_txv)
1✔
1017
                return 0;
×
1018

1019
        *ret = NULL;
1✔
1020
        if (itr->curr_txv != NULL)
1✔
1021
                vy_txw_iterator_get(itr, ret);
1✔
1022
        return 1;
1✔
1023
}
1024

1025
/**
1026
 * Close a txw iterator.
1027
 */
1028
static void
1029
vy_txw_iterator_close(struct vy_stmt_iterator *vitr)
41,017✔
1030
{
1031
        assert(vitr->iface->close == vy_txw_iterator_close);
41,017✔
1032
        struct vy_txw_iterator *itr = (struct vy_txw_iterator *) vitr;
41,017✔
1033
        (void)itr; /* suppress warn if NDEBUG */
1034
        TRASH(itr);
41,017✔
1035
}
41,017✔
1036

1037
static struct vy_stmt_iterator_iface vy_txw_iterator_iface = {
1038
        .next_key = vy_txw_iterator_next_key,
1039
        .next_lsn = vy_txw_iterator_next_lsn,
1040
        .restore = vy_txw_iterator_restore,
1041
        .close = vy_txw_iterator_close
1042
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc