• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 9433

pending completion
9433

push

travis-ci

locker
box: implement persistent sequences

This patch implements a new object type, persistent sequences. Sequences
are created with function box.schema.sequence.create(name, options).
Options include min/max, start value, increment, cache size, just like
in Postgresql, although 'cache' is ignored for now. All sequences can be
accessed via box.sequence.<name>, similarly to spaces. To generate a
sequence value, use seq:next() method. To retrieve the last generated
value, use seq:get(). A sequence value can also be reset to the start
value or to any other value using seq:reset() and seq:set() methods.

Needed for #389

268 of 268 new or added lines in 9 files covered. (100.0%)

32846 of 37402 relevant lines covered (87.82%)

1100205.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.13
/src/box/txn.cc
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "txn.h"
32
#include "engine.h"
33
#include "tuple.h"
34
#include "journal.h"
35
#include <fiber.h>
36
#include "xrow.h"
37

38
enum {
39
        /**
40
         * Maximum recursion depth for on_replace triggers.
41
         * Large numbers may corrupt C stack.
42
         */
43
        TXN_SUB_STMT_MAX = 3
44
};
45

46
double too_long_threshold;
47

48
static inline void
49
fiber_set_txn(struct fiber *fiber, struct txn *txn)
8,395,392✔
50
{
51
        fiber_set_key(fiber, FIBER_KEY_TXN, (void *) txn);
8,395,392✔
52
}
8,395,392✔
53

54
static void
55
txn_add_redo(struct txn_stmt *stmt, struct request *request)
4,234,504✔
56
{
57
        stmt->row = request->header;
4,234,504✔
58
        if (request->header != NULL)
4,234,504✔
59
                return;
4,641,662✔
60

61
        /* Create a redo log row for Lua requests */
62
        struct xrow_header *row;
63
        row = region_alloc_object_xc(&fiber()->gc, struct xrow_header);
3,827,346✔
64
        /* Initialize members explicitly to save time on memset() */
65
        row->type = request->type;
3,827,346✔
66
        row->replica_id = 0;
3,827,346✔
67
        row->lsn = 0;
3,827,346✔
68
        row->sync = 0;
3,827,346✔
69
        row->tm = 0;
3,827,346✔
70
        row->bodycnt = xrow_encode_dml_xc(request, row->body);
3,827,346✔
71
        stmt->row = row;
3,827,346✔
72
}
73

74
/** Initialize a new stmt object within txn. */
75
static struct txn_stmt *
76
txn_stmt_new(struct txn *txn)
4,262,033✔
77
{
78
        struct txn_stmt *stmt;
79
        stmt = region_alloc_object_xc(&fiber()->gc, struct txn_stmt);
4,262,033✔
80

81
        /* Initialize members explicitly to save time on memset() */
82
        stmt->space = NULL;
4,262,033✔
83
        stmt->old_tuple = NULL;
4,262,033✔
84
        stmt->new_tuple = NULL;
4,262,033✔
85
        stmt->engine_savepoint = NULL;
4,262,033✔
86
        stmt->row = NULL;
4,262,033✔
87

88
        stailq_add_tail_entry(&txn->stmts, stmt, next);
4,262,033✔
89
        ++txn->in_sub_stmt;
4,262,033✔
90
        return stmt;
4,262,033✔
91
}
92

93
/** txn->space_after_commit callback function. */
94
static void
95
txn_space_after_commit(struct trigger * /* trigger */, void *event)
45✔
96
{
97
        struct txn *txn = (struct txn *) event;
45✔
98
        struct txn_stmt *stmt;
99
        stailq_foreach_entry(stmt, &txn->stmts, next) {
91✔
100
                if (stmt->space->run_triggers &&
46✔
101
                    (stmt->old_tuple || stmt->new_tuple))
18✔
102
                        trigger_run(&stmt->space->after_commit, stmt);
46✔
103
        }
104
}
45✔
105

106
struct txn *
107
txn_begin(bool is_autocommit)
4,197,697✔
108
{
109
        static int64_t txn_id = 0;
110
        assert(! in_txn());
4,197,697✔
111
        struct txn *txn = region_alloc_object_xc(&fiber()->gc, struct txn);
4,197,697✔
112
        /* Initialize members explicitly to save time on memset() */
113
        stailq_create(&txn->stmts);
4,197,697✔
114
        txn->n_rows = 0;
4,197,697✔
115
        txn->is_autocommit = is_autocommit;
4,197,697✔
116
        txn->has_triggers  = false;
4,197,697✔
117
        txn->in_sub_stmt = 0;
4,197,697✔
118
        txn->id = ++txn_id;
4,197,697✔
119
        txn->signature = -1;
4,197,697✔
120
        txn->engine = NULL;
4,197,697✔
121
        txn->engine_tx = NULL;
4,197,697✔
122
        trigger_create(&txn->space_after_commit,
123
                       txn_space_after_commit, NULL, NULL);
4,197,697✔
124
        /* fiber_on_yield/fiber_on_stop initialized by engine on demand */
125
        fiber_set_txn(fiber(), txn);
4,197,697✔
126
        return txn;
4,197,697✔
127
}
128

129
void
130
txn_begin_in_engine(Engine *engine, struct txn *txn)
4,265,248✔
131
{
132
        if (txn->engine == NULL) {
4,265,248✔
133
                assert(stailq_empty(&txn->stmts));
4,196,551✔
134
                txn->engine = engine;
4,196,551✔
135
                engine->begin(txn);
4,196,551✔
136
        } else if (txn->engine != engine) {
68,697✔
137
                /**
138
                 * Only one engine can be used in
139
                 * a multi-statement transaction currently.
140
                 */
141
                tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION);
×
142
        }
143
}
4,265,248✔
144

145
struct txn *
146
txn_begin_stmt(struct space *space)
4,262,034✔
147
{
148
        struct txn *txn = in_txn();
4,262,034✔
149
        if (txn == NULL)
4,262,034✔
150
                txn = txn_begin(true);
4,189,447✔
151
        else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX)
72,587✔
152
                tnt_raise(ClientError, ER_SUB_STMT_MAX);
1✔
153

154
        trigger_run(&space->on_stmt_begin, txn);
4,262,033✔
155
        Engine *engine = space->handler->engine;
4,262,033✔
156
        txn_begin_in_engine(engine, txn);
4,262,033✔
157
        struct txn_stmt *stmt = txn_stmt_new(txn);
4,262,033✔
158
        stmt->space = space;
4,262,033✔
159

160
        engine->beginStatement(txn);
4,262,033✔
161
        return txn;
4,262,033✔
162
}
163

164
/**
165
 * End a statement. In autocommit mode, end
166
 * the current transaction as well.
167
 */
168
void
169
txn_commit_stmt(struct txn *txn, struct request *request)
4,244,541✔
170
{
171
        assert(txn->in_sub_stmt > 0);
4,244,541✔
172
        /*
173
         * Run on_replace triggers. For now, disallow mutation
174
         * of tuples in the trigger.
175
         */
176
        struct txn_stmt *stmt = stailq_last_entry(&txn->stmts,
4,244,541✔
177
                                                  struct txn_stmt, next);
178

179
        /* Create WAL record for the write requests in non-temporary spaces */
180
        if (!space_is_temporary(stmt->space)) {
4,244,541✔
181
                txn_add_redo(stmt, request);
4,234,504✔
182
                ++txn->n_rows;
4,234,504✔
183
        }
184
        /*
185
         * If there are triggers, and they are not disabled, and
186
         * the statement found any rows, run triggers.
187
         * XXX:
188
         * - vinyl doesn't set old/new tuple, so triggers don't
189
         *   work for it
190
         * - perhaps we should run triggers even for deletes which
191
         *   doesn't find any rows
192
         */
193
        if (stmt->space->run_triggers && (stmt->old_tuple || stmt->new_tuple)) {
4,244,541✔
194
                /*
195
                 * If the space has after_commit triggers install a trigger
196
                 * to run them from txn_commit().
197
                 */
198
                if (!rlist_empty(&stmt->space->after_commit) &&
3,746,972✔
199
                    rlist_empty(&txn->space_after_commit.link))
56✔
200
                        txn_on_commit(txn, &txn->space_after_commit);
54✔
201

202
                if (!rlist_empty(&stmt->space->on_replace))
3,746,916✔
203
                        trigger_run(&stmt->space->on_replace, txn);
382,562✔
204
        }
205
        --txn->in_sub_stmt;
4,244,160✔
206
        if (txn->is_autocommit && txn->in_sub_stmt == 0)
4,244,160✔
207
                txn_commit(txn);
4,188,716✔
208
}
4,244,099✔
209

210

211
static int64_t
212
txn_write_to_wal(struct txn *txn)
4,184,967✔
213
{
214
        assert(txn->n_rows > 0);
4,184,967✔
215

216
        struct journal_entry *req = journal_entry_new(txn->n_rows);
4,184,967✔
217
        if (req == NULL)
4,184,967✔
218
                diag_raise();
×
219

220
        struct txn_stmt *stmt;
221
        struct xrow_header **row = req->rows;
4,184,967✔
222
        stailq_foreach_entry(stmt, &txn->stmts, next) {
8,434,215✔
223
                if (stmt->row == NULL)
4,249,248✔
224
                        continue; /* A read (e.g. select) request */
17,104✔
225
                *row++ = stmt->row;
4,232,144✔
226
        }
227
        assert(row == req->rows + req->n_rows);
4,184,967✔
228

229
        ev_tstamp start = ev_monotonic_now(loop());
4,184,967✔
230
        int64_t res = journal_write(req);
4,184,967✔
231

232
        ev_tstamp stop = ev_monotonic_now(loop());
4,184,967✔
233
        if (stop - start > too_long_threshold)
4,184,967✔
234
                say_warn("too long WAL write: %.3f sec", stop - start);
×
235
        if (res < 0) {
4,184,967✔
236
                /* Cascading rollback. */
237
                txn_rollback(); /* Perform our part of cascading rollback. */
161✔
238
                /*
239
                 * Move fiber to end of event loop to avoid
240
                 * execution of any new requests before all
241
                 * pending rollbacks are processed.
242
                 */
243
                fiber_reschedule();
161✔
244
                tnt_raise(LoggedError, ER_WAL_IO);
161✔
245
        }
246
        /*
247
         * Use vclock_sum() from WAL writer as transaction signature.
248
         */
249
        return res;
4,184,806✔
250
}
251

252
void
253
txn_commit(struct txn *txn)
4,195,774✔
254
{
255
        assert(txn == in_txn());
4,195,774✔
256

257
        assert(stailq_empty(&txn->stmts) || txn->engine);
4,195,774✔
258

259
        /* Do transaction conflict resolving */
260
        if (txn->engine) {
4,195,774✔
261
                txn->engine->prepare(txn);
4,195,640✔
262

263
                if (txn->n_rows > 0)
4,195,431✔
264
                        txn->signature = txn_write_to_wal(txn);
4,184,967✔
265
                /*
266
                 * The transaction is in the binary log. No action below
267
                 * may throw. In case an error has happened, there is
268
                 * no other option but terminate.
269
                 */
270
                if (txn->has_triggers)
4,195,270✔
271
                        trigger_run(&txn->on_commit, txn);
379,992✔
272

273
                txn->engine->commit(txn);
4,195,270✔
274
        }
275
        TRASH(txn);
4,195,404✔
276
        /** Free volatile txn memory. */
277
        fiber_gc();
4,195,404✔
278
        fiber_set_txn(fiber(), NULL);
4,195,404✔
279
}
4,195,404✔
280

281
/**
282
 * Void all effects of the statement, but
283
 * keep it in the list - to maintain
284
 * limit on the number of statements in a
285
 * transaction.
286
 */
287
void
288
txn_rollback_stmt()
18,080✔
289
{
290
        struct txn *txn = in_txn();
18,080✔
291
        if (txn == NULL)
18,080✔
292
                return;
215✔
293
        if (txn->is_autocommit)
17,865✔
294
                return txn_rollback();
736✔
295
        if (txn->in_sub_stmt == 0)
17,129✔
296
                return;
×
297
        struct txn_stmt *stmt = stailq_last_entry(&txn->stmts, struct txn_stmt,
17,129✔
298
                                                  next);
299
        txn->engine->rollbackStatement(txn, stmt);
17,129✔
300
        if (stmt->row != NULL) {
17,129✔
301
                stmt->row = NULL;
10✔
302
                --txn->n_rows;
10✔
303
                assert(txn->n_rows >= 0);
10✔
304
        }
305
        --txn->in_sub_stmt;
17,129✔
306
}
307

308
void
309
txn_rollback()
2,455✔
310
{
311
        struct txn *txn = in_txn();
2,455✔
312
        if (txn == NULL)
2,455✔
313
                return;
2,619✔
314
        if (txn->has_triggers)
2,291✔
315
                trigger_run(&txn->on_rollback, txn); /* must not throw. */
152✔
316
        if (txn->engine)
2,291✔
317
                txn->engine->rollback(txn);
1,280✔
318
        TRASH(txn);
2,291✔
319
        /** Free volatile txn memory. */
320
        fiber_gc();
2,291✔
321
        fiber_set_txn(fiber(), NULL);
2,291✔
322
}
323

324
void
325
txn_check_singlestatement(struct txn *txn, const char *where)
382,201✔
326
{
327
        if (!txn->is_autocommit ||
764,393✔
328
            stailq_last(&txn->stmts) != stailq_first(&txn->stmts)) {
382,192✔
329
                tnt_raise(ClientError, ER_UNSUPPORTED,
18✔
330
                          where, "multi-statement transactions");
331
        }
332
}
382,183✔
333

334
extern "C" {
335

336
int64_t
337
box_txn_id(void)
98✔
338
{
339
        struct txn *txn = in_txn();
98✔
340
        if (txn != NULL)
98✔
341
                return txn->id;
98✔
342
        else
343
                return -1;
×
344
}
345

346
bool
347
box_txn()
21✔
348
{
349
        return in_txn() != NULL;
21✔
350
}
351

352
int
353
box_txn_begin()
8,253✔
354
{
355
        try {
356
                if (in_txn())
8,253✔
357
                        tnt_raise(ClientError, ER_ACTIVE_TRANSACTION);
3✔
358
                (void) txn_begin(false);
8,250✔
359
        } catch (Exception  *e) {
6✔
360
                return -1; /* pass exception  through FFI */
3✔
361
        }
362
        return 0;
8,250✔
363
}
364

365
int
366
box_txn_commit()
7,073✔
367
{
368
        struct txn *txn = in_txn();
7,073✔
369
        /**
370
         * COMMIT is like BEGIN or ROLLBACK
371
         * a "transaction-initiating statement".
372
         * Do nothing if transaction is not started,
373
         * it's the same as BEGIN + COMMIT.
374
        */
375
        if (! txn)
7,073✔
376
                return 0;
14✔
377
        if (txn->in_sub_stmt) {
7,059✔
378
                diag_set(ClientError, ER_COMMIT_IN_SUB_STMT);
1✔
379
                return -1;
1✔
380
        }
381
        try {
382
                txn_commit(txn);
7,058✔
383
        } catch (Exception *e) {
309✔
384
                txn_rollback();
309✔
385
                return -1;
309✔
386
        }
387
        return 0;
6,749✔
388
}
389

390
int
391
box_txn_rollback()
1,195✔
392
{
393
        struct txn *txn = in_txn();
1,195✔
394
        if (txn && txn->in_sub_stmt) {
1,195✔
395
                diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
1✔
396
                return -1;
1✔
397
        }
398
        txn_rollback(); /* doesn't throw */
1,194✔
399
        return 0;
1,194✔
400
}
401

402
void *
403
box_txn_alloc(size_t size)
×
404
{
405
        union natural_align {
406
                void *p;
407
                double lf;
408
                long l;
409
        };
410
        return region_aligned_alloc(&fiber()->gc, size,
411
                                    alignof(union natural_align));
×
412
}
413

414
box_txn_savepoint_t *
415
box_txn_savepoint()
52✔
416
{
417
        struct txn *txn = in_txn();
52✔
418
        if (txn == NULL) {
52✔
419
                diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
2✔
420
                return NULL;
2✔
421
        }
422
        struct txn_savepoint *svp =
423
                (struct txn_savepoint *) region_alloc_object(&fiber()->gc,
50✔
424
                                                        struct txn_savepoint);
425
        if (svp == NULL) {
50✔
426
                diag_set(OutOfMemory, sizeof(*svp) + alignof(*svp) - 1,
×
427
                         "region_alloc_object", "svp");
428
                return NULL;
×
429
        }
430
        if (stailq_empty(&txn->stmts)) {
50✔
431
                svp->is_first = true;
8✔
432
                return svp;
8✔
433
        }
434
        svp->is_first = false;
42✔
435
        svp->stmt = txn_last_stmt(txn);
42✔
436
        svp->in_sub_stmt = txn->in_sub_stmt;
42✔
437
        return svp;
42✔
438
}
439

440
int
441
box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp)
44✔
442
{
443
        struct txn *txn = in_txn();
44✔
444
        if (txn == NULL) {
44✔
445
                diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
×
446
                return -1;
×
447
        }
448
        struct txn_stmt *stmt = svp->stmt;
44✔
449
        if (!svp->is_first && (stmt == NULL || stmt->space == NULL ||
76✔
450
                               svp->in_sub_stmt != txn->in_sub_stmt)) {
32✔
451
                diag_set(ClientError, ER_NO_SUCH_SAVEPOINT);
8✔
452
                return -1;
8✔
453
        }
454
        struct stailq rollback_stmts;
455
        if (svp->is_first) {
36✔
456
                rollback_stmts = txn->stmts;
6✔
457
                stailq_create(&txn->stmts);
6✔
458
        } else {
459
                stailq_create(&rollback_stmts);
30✔
460
                stmt = stailq_next_entry(stmt, next);
30✔
461
                stailq_splice(&txn->stmts, &stmt->next, &rollback_stmts);
30✔
462
        }
463
        stailq_reverse(&rollback_stmts);
36✔
464
        stailq_foreach_entry(stmt, &rollback_stmts, next) {
88✔
465
                txn->engine->rollbackStatement(txn, stmt);
52✔
466
                if (stmt->row != NULL) {
52✔
467
                        stmt->row = NULL;
48✔
468
                        --txn->n_rows;
48✔
469
                        assert(txn->n_rows >= 0);
48✔
470
                }
471
                stmt->space = NULL;
52✔
472
        }
473
        return 0;
36✔
474
}
475

476
} /* extern "C" */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc