• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 9253825433

27 May 2024 11:10AM UTC coverage: 85.789% (-0.006%) from 85.795%
9253825433

Pull #10020

github

web-flow
Merge 051c335d9 into 69f2ddfcc
Pull Request #10020: [backport 2.11] cmake: bump OpenSSL version 1.1.1 -> 3.2.1

62596 of 113824 branches covered (54.99%)

93468 of 108951 relevant lines covered (85.79%)

2595923.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.99
/src/box/memtx_engine.cc
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "memtx_engine.h"
32

33
#include <small/quota.h>
34
#include <small/small.h>
35
#include <small/mempool.h>
36

37
#include "fiber.h"
38
#include "errinj.h"
39
#include "coio_file.h"
40
#include "info/info.h"
41
#include "tuple.h"
42
#include "txn.h"
43
#include "memtx_tx.h"
44
#include "memtx_tree.h"
45
#include "iproto_constants.h"
46
#include "xrow.h"
47
#include "xstream.h"
48
#include "bootstrap.h"
49
#include "replication.h"
50
#include "schema.h"
51
#include "gc.h"
52
#include "raft.h"
53
#include "txn_limbo.h"
54
#include "memtx_allocator.h"
55
#include "index.h"
56
#include "read_view.h"
57
#include "memtx_tuple_compression.h"
58
#include "memtx_space.h"
59
#include "memtx_space_upgrade.h"
60

61
#include <type_traits>
62

63
/* sync snapshot every 16MB */
64
#define SNAP_SYNC_INTERVAL        (1 << 24)
65

66
static void
67
checkpoint_cancel(struct checkpoint *ckpt);
68

69
static void
70
replica_join_cancel(struct cord *replica_join_cord);
71

72
enum {
73
        OBJSIZE_MIN = 16,
74
        SLAB_SIZE = 16 * 1024 * 1024,
75
        MIN_MEMORY_QUOTA = SLAB_SIZE * 4,
76
        MAX_TUPLE_SIZE = 1 * 1024 * 1024,
77
};
78

79
template <class ALLOC>
80
static inline void
81
create_memtx_tuple_format_vtab(struct tuple_format_vtab *vtab);
82

83
struct tuple *
84
(*memtx_tuple_new_raw)(struct tuple_format *format, const char *data,
85
                       const char *end, bool validate);
86

87
template <class ALLOC>
88
static inline struct tuple *
89
memtx_tuple_new_raw_impl(struct tuple_format *format, const char *data,
90
                         const char *end, bool validate);
91

92
template <class ALLOC>
93
static void
94
memtx_alloc_init(void)
3,260✔
95
{
96
        memtx_tuple_new_raw = memtx_tuple_new_raw_impl<ALLOC>;
3,260✔
97
}
3,260✔
98

99
static int
100
memtx_end_build_primary_key(struct space *space, void *param)
30,892✔
101
{
102
        struct memtx_space *memtx_space = (struct memtx_space *)space;
30,892✔
103
        if (space->engine != param || space_index(space, 0) == NULL ||
50,762✔
104
            memtx_space->replace == memtx_space_replace_all_keys)
19,870✔
105
                return 0;
28,231✔
106

107
        index_end_build(space->index[0]);
2,661✔
108
        memtx_space->replace = memtx_space_replace_primary_key;
2,661✔
109
        return 0;
2,661✔
110
}
111

112
/**
113
 * Build memtx secondary index based on the contents of primary index.
114
 */
115
static int
116
memtx_build_secondary_index(struct index *index, struct index *pk)
107✔
117
{
118
        ssize_t n_tuples = index_size(pk);
107✔
119
        if (n_tuples < 0)
107!
120
                return -1;
×
121
        uint32_t estimated_tuples = n_tuples * 1.2;
107✔
122

123
        index_begin_build(index);
107✔
124
        if (index_reserve(index, estimated_tuples) < 0)
107✔
125
                return -1;
1✔
126

127
        if (n_tuples > 0) {
106✔
128
                say_info("Adding %zd keys to %s index '%s' ...",
88!
129
                         n_tuples, index_type_strs[index->def->type],
130
                         index->def->name);
131
        }
132

133
        struct iterator *it = index_create_iterator(pk, ITER_ALL, NULL, 0);
106✔
134
        if (it == NULL)
106!
135
                return -1;
×
136

137
        int rc = 0;
106✔
138
        while (true) {
139
                struct tuple *tuple;
140
                rc = iterator_next_internal(it, &tuple);
211,306!
141
                if (rc != 0)
211,306!
142
                        break;
×
143
                if (tuple == NULL)
211,306✔
144
                        break;
106✔
145
                rc = index_build_next(index, tuple);
211,200!
146
                if (rc != 0)
211,200!
147
                        break;
×
148
        }
211,200✔
149
        iterator_delete(it);
106✔
150
        if (rc != 0)
106!
151
                return -1;
×
152

153
        index_end_build(index);
106✔
154
        return 0;
106✔
155
}
156

157
/**
158
 * Secondary indexes are built in bulk after all data is
159
 * recovered. This function enables secondary keys on a space.
160
 * Data dictionary spaces are an exception, they are fully
161
 * built right from the start.
162
 */
163
static int
164
memtx_build_secondary_keys(struct space *space, void *param)
30,888✔
165
{
166
        struct memtx_space *memtx_space = (struct memtx_space *)space;
30,888✔
167
        if (space->engine != param || space_index(space, 0) == NULL ||
50,750✔
168
            memtx_space->replace == memtx_space_replace_all_keys)
19,862✔
169
                return 0;
28,123✔
170

171
        if (space->index_id_max > 0) {
2,765✔
172
                struct index *pk = space->index[0];
85✔
173
                ssize_t n_tuples = index_size(pk);
85✔
174
                assert(n_tuples >= 0);
85!
175

176
                if (n_tuples > 0) {
85✔
177
                        say_info("Building secondary indexes in space '%s'...",
67!
178
                                 space_name(space));
179
                }
180

181
                for (uint32_t j = 1; j < space->index_count; j++) {
191✔
182
                        if (memtx_build_secondary_index(space->index[j],
107✔
183
                                                        pk) < 0)
107✔
184
                                return -1;
1✔
185
                }
186

187
                if (n_tuples > 0) {
84✔
188
                        say_info("Space '%s': done", space_name(space));
66!
189
                }
190
        }
191
        memtx_space->replace = memtx_space_replace_all_keys;
2,764✔
192
        return 0;
2,764✔
193
}
194

195
static void
196
memtx_engine_shutdown(struct engine *engine)
3,219✔
197
{
198
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
3,219✔
199
        if (memtx->checkpoint != NULL)
3,219✔
200
                checkpoint_cancel(memtx->checkpoint);
7✔
201
        if (memtx->replica_join_cord != NULL)
3,219!
202
                replica_join_cancel(memtx->replica_join_cord);
×
203
        mempool_destroy(&memtx->iterator_pool);
3,219✔
204
        if (mempool_is_initialized(&memtx->rtree_iterator_pool))
3,219✔
205
                mempool_destroy(&memtx->rtree_iterator_pool);
16✔
206
        mempool_destroy(&memtx->index_extent_pool);
3,219✔
207
        slab_cache_destroy(&memtx->index_slab_cache);
3,219✔
208
        /*
209
         * The order is vital: allocator destroy should take place before
210
         * slab cache destroy!
211
         */
212
        memtx_allocators_destroy();
3,219✔
213
        slab_cache_destroy(&memtx->slab_cache);
3,219✔
214
        tuple_arena_destroy(&memtx->arena);
3,219✔
215

216
        xdir_destroy(&memtx->snap_dir);
3,219✔
217
        tuple_format_unref(memtx->func_key_format);
3,219✔
218
        free(memtx);
3,219✔
219
}
3,219✔
220

221
/**
222
 * State of memtx engine snapshot recovery.
223
 */
224
enum snapshot_recovery_state {
225
        /* Initial state. */
226
        SNAPSHOT_RECOVERY_NOT_STARTED,
227
        /* Set when at least one system space was recovered. */
228
        RECOVERING_SYSTEM_SPACES,
229
        /*
230
         * Set when all system spaces are recovered, i.e., when a user space
231
         * request or a non-insert request appears.
232
         */
233
        DONE_RECOVERING_SYSTEM_SPACES,
234
};
235

236
/**
237
 * Recovers one xrow from snapshot.
238
 *
239
 * @retval -1 error, diagnostic set
240
 * @retval 0 success
241
 */
242
static int
243
memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
244
                                  struct xrow_header *row,
245
                                  enum snapshot_recovery_state *state);
246

247
int
248
memtx_engine_recover_snapshot(struct memtx_engine *memtx,
689✔
249
                              const struct vclock *vclock)
250
{
251
        /* Process existing snapshot */
252
        say_info("recovery start");
689!
253
        int64_t signature = vclock_sum(vclock);
689✔
254
        const char *filename = xdir_format_filename(&memtx->snap_dir,
689!
255
                                                    signature, NONE);
256

257
        say_info("recovering from `%s'", filename);
689!
258
        struct xlog_cursor cursor;
259
        if (xlog_cursor_open(&cursor, filename) < 0)
689!
260
                return -1;
×
261

262
        int rc;
263
        struct xrow_header row;
264
        uint64_t row_count = 0;
689✔
265
        bool force_recovery = false;
689✔
266
        enum snapshot_recovery_state state = SNAPSHOT_RECOVERY_NOT_STARTED;
689✔
267
        while ((rc = xlog_cursor_next(&cursor, &row, force_recovery)) == 0) {
569,498!
268
                row.lsn = signature;
568,820✔
269
                rc = memtx_engine_recover_snapshot_row(memtx, &row, &state);
568,820!
270
                if (state == DONE_RECOVERING_SYSTEM_SPACES)
568,819✔
271
                        force_recovery = memtx->force_recovery;
301,638✔
272
                if (rc < 0) {
568,819✔
273
                        if (!force_recovery)
13✔
274
                                break;
10✔
275
                        say_error("can't apply row: ");
3!
276
                        diag_log();
3!
277
                }
278
                ++row_count;
568,809✔
279
                if (row_count % 100000 == 0) {
568,809✔
280
                        say_info_ratelimited("%.1fM rows processed",
2!
281
                                             row_count / 1e6);
282
                        fiber_yield_timeout(0);
2!
283
                }
284
        }
285
        xlog_cursor_close(&cursor, false);
688!
286
        if (rc < 0)
688✔
287
                return -1;
13✔
288

289
        /**
290
         * We should never try to read snapshots with no EOF
291
         * marker - such snapshots are very likely corrupted and
292
         * should not be trusted.
293
         */
294
        if (!xlog_cursor_is_eof(&cursor)) {
675✔
295
                if (!memtx->force_recovery)
3!
296
                        panic("snapshot `%s' has no EOF marker", cursor.name);
×
297
                else
298
                        say_error("snapshot `%s' has no EOF marker", cursor.name);
3!
299
        }
300

301
        /*
302
         * Snapshot entries are ordered by the space id, it means that if there
303
         * are no spaces, then all system spaces are definitely missing.
304
         */
305
        if (state == SNAPSHOT_RECOVERY_NOT_STARTED) {
675✔
306
                diag_set(ClientError, ER_MISSING_SYSTEM_SPACES);
1!
307
                return -1;
1✔
308
        }
309

310
        return 0;
674✔
311
}
312

313
static int
314
memtx_engine_recover_raft(const struct xrow_header *row)
2,765✔
315
{
316
        assert(row->type == IPROTO_RAFT);
2,765!
317
        struct raft_request req;
318
        /* Vclock is never persisted in WAL by Raft. */
319
        if (xrow_decode_raft(row, &req, NULL) != 0)
2,765!
320
                return -1;
2✔
321
        box_raft_recover(&req);
2,763!
322
        return 0;
2,763✔
323
}
324

325
static int
326
memtx_engine_recover_synchro(const struct xrow_header *row)
2,762✔
327
{
328
        assert(row->type == IPROTO_RAFT_PROMOTE);
2,762!
329
        struct synchro_request req;
330
        struct vclock synchro_vclock;
331
        if (xrow_decode_synchro(row, &req, &synchro_vclock) != 0)
2,762!
332
                return -1;
×
333
        /*
334
         * Origin id cannot be deduced from row.replica_id in a checkpoint,
335
         * because all its rows have a zero replica_id.
336
         */
337
        req.origin_id = req.replica_id;
2,762✔
338
        return txn_limbo_process(&txn_limbo, &req);
2,762!
339
}
340

341
/** Checks and updates the snapshot recovery state. */
342
static int
343
snapshot_recovery_state_update(enum snapshot_recovery_state *state,
1,381,350✔
344
                               bool is_system_space_request)
345
{
346
        switch (*state) {
1,381,350!
347
        case SNAPSHOT_RECOVERY_NOT_STARTED:
2,792✔
348
                if (!is_system_space_request) {
2,792✔
349
                        diag_set(ClientError, ER_MISSING_SYSTEM_SPACES);
2!
350
                        return -1;
2✔
351
                }
352
                *state = RECOVERING_SYSTEM_SPACES;
2,790✔
353
                break;
2,790✔
354
        case RECOVERING_SYSTEM_SPACES:
1,075,490✔
355
                if (!is_system_space_request)
1,075,490✔
356
                        *state = DONE_RECOVERING_SYSTEM_SPACES;
2,778✔
357
                break;
1,075,490✔
358
        case DONE_RECOVERING_SYSTEM_SPACES:
303,071✔
359
                break;
303,071✔
360
        }
361
        return 0;
1,381,350✔
362
}
363

364
static int
365
memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
1,381,350✔
366
                                  struct xrow_header *row,
367
                                  enum snapshot_recovery_state *state)
368
{
369
        assert(row->bodycnt == 1); /* always 1 for read */
1,381,350!
370
        if (row->type != IPROTO_INSERT) {
1,381,350✔
371
                if (snapshot_recovery_state_update(state, false) != 0)
5,530!
372
                        return -1;
1✔
373
                if (row->type == IPROTO_RAFT)
5,529✔
374
                        return memtx_engine_recover_raft(row);
2,765!
375
                if (row->type == IPROTO_RAFT_PROMOTE)
2,764✔
376
                        return memtx_engine_recover_synchro(row);
2,762!
377
                diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
2!
378
                         (uint32_t) row->type);
379
                return -1;
2✔
380
        }
381
        struct request request;
382
        RegionGuard region_guard(&fiber()->gc);
2,751,640!
383
        if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
1,375,820!
384
                return -1;
×
385
        bool is_system_space_request = space_id_is_system(request.space_id);
1,375,820✔
386
        if (snapshot_recovery_state_update(state, is_system_space_request) != 0)
1,375,820!
387
                return -1;
1✔
388
        struct space *space = space_cache_find(request.space_id);
1,375,820!
389
        if (space == NULL)
1,375,820✔
390
                goto log_request;
2✔
391
        /* memtx snapshot must contain only memtx spaces */
392
        if (space->engine != (struct engine *)memtx) {
1,375,820!
393
                diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
×
394
                goto log_request;
×
395
        }
396
        struct txn *txn;
397
        txn = txn_begin();
1,375,820!
398
        if (txn == NULL)
1,375,820!
399
                goto log_request;
×
400
        if (txn_begin_stmt(txn, space, request.type) != 0)
1,375,820!
401
                goto rollback;
×
402
        /* no access checks here - applier always works with admin privs */
403
        struct tuple *unused;
404
        if (space_execute_dml(space, txn, &request, &unused) != 0)
1,375,820!
405
                goto rollback_stmt;
×
406
        if (txn_commit_stmt(txn, &request) != 0)
1,375,820!
407
                goto rollback;
5✔
408
        /*
409
         * Snapshot rows are confirmed by definition. They don't need to go to
410
         * the synchronous transactions limbo.
411
         */
412
        txn_set_flags(txn, TXN_FORCE_ASYNC);
1,375,810✔
413
        return txn_commit(txn);
1,375,810!
414
rollback_stmt:
×
415
        txn_rollback_stmt(txn);
×
416
rollback:
5✔
417
        txn_abort(txn);
5!
418
log_request:
7✔
419
        say_error("error at request: %s", request_str(&request));
7!
420
        return -1;
7✔
421
}
422

423
/** Called at start to tell memtx to recover to a given LSN. */
424
static int
425
memtx_engine_begin_initial_recovery(struct engine *engine,
1,140✔
426
                                    const struct vclock *vclock)
427
{
428
        (void)vclock;
429
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
1,140✔
430
        assert(memtx->state == MEMTX_INITIALIZED);
1,140!
431
        /*
432
         * By default, enable fast start: bulk read of tuples
433
         * from the snapshot, in which they are stored in key
434
         * order, and bulk build of the primary key.
435
         *
436
         * If force_recovery = true, it's a disaster
437
         * recovery mode. Enable all keys on start, to detect and
438
         * discard duplicates in the snapshot.
439
         */
440
        memtx->state = (memtx->force_recovery ?
1,140✔
441
                        MEMTX_OK : MEMTX_INITIAL_RECOVERY);
442
        return 0;
1,140✔
443
}
444

445
static int
446
memtx_engine_begin_final_recovery(struct engine *engine)
1,123✔
447
{
448
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
1,123✔
449
        if (memtx->state == MEMTX_OK)
1,123✔
450
                return 0;
43✔
451

452
        assert(memtx->state == MEMTX_INITIAL_RECOVERY);
1,080!
453
        /* End of the fast path: loaded the primary key. */
454
        space_foreach(memtx_end_build_primary_key, memtx);
1,080✔
455

456
        /* Complete space initialization. */
457
        int rc = space_foreach(space_on_initial_recovery_complete, NULL);
1,080✔
458
        /* If failed - the snapshot has inconsistent data. We cannot start. */
459
        if (rc != 0) {
1,080!
460
                diag_log();
×
461
                panic("Failed to complete recovery from snapshot!");
×
462
        }
463

464
        if (!memtx->force_recovery && !memtx_tx_manager_use_mvcc_engine) {
1,080!
465
                /*
466
                 * Fast start path: "play out" WAL
467
                 * records using the primary key only,
468
                 * then bulk-build all secondary keys.
469
                 */
470
                memtx->state = MEMTX_FINAL_RECOVERY;
1,052✔
471
        } else {
472
                /*
473
                 * If force_recovery = true, it's
474
                 * a disaster recovery mode. Build
475
                 * secondary keys before reading the WAL,
476
                 * to detect and discard duplicates in
477
                 * unique keys.
478
                 */
479
                memtx->state = MEMTX_OK;
28✔
480
                if (space_foreach(memtx_build_secondary_keys, memtx) != 0)
28!
481
                        return -1;
×
482
                memtx->on_indexes_built_cb();
28✔
483
        }
484
        return 0;
1,080✔
485
}
486

487
static int
488
memtx_engine_begin_hot_standby(struct engine *engine)
3✔
489
{
490
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
3✔
491
        /*
492
         * Build secondary indexes before entering the hot standby mode
493
         * to quickly switch to the hot standby instance after the master
494
         * instance exits.
495
         */
496
        if (memtx->state != MEMTX_OK) {
3!
497
                assert(memtx->state == MEMTX_FINAL_RECOVERY);
3!
498
                memtx->state = MEMTX_OK;
3✔
499
                if (space_foreach(memtx_build_secondary_keys, memtx) != 0)
3!
500
                        return -1;
×
501
                memtx->on_indexes_built_cb();
3✔
502
        }
503
        return 0;
3✔
504
}
505

506
static int
507
memtx_engine_end_recovery(struct engine *engine)
1,114✔
508
{
509
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
1,114✔
510
        /*
511
         * Secondary keys have already been built in the following cases:
512
         * - force_recovery is set
513
         * - it's a replication join
514
         * - instance was in the hot standby mode
515
         */
516
        if (memtx->state != MEMTX_OK) {
1,114✔
517
                assert(memtx->state == MEMTX_FINAL_RECOVERY);
1,042!
518
                memtx->state = MEMTX_OK;
1,042✔
519
                if (space_foreach(memtx_build_secondary_keys, memtx) != 0)
1,042✔
520
                        return -1;
1✔
521
                memtx->on_indexes_built_cb();
1,041✔
522
        }
523
        xdir_collect_inprogress(&memtx->snap_dir);
1,113✔
524

525
        /* Complete space initialization. */
526
        int rc = space_foreach(space_on_final_recovery_complete, NULL);
1,113✔
527
        if (rc != 0) {
1,113!
528
                diag_log();
×
529
                panic("Failed to complete recovery from WAL!");
×
530
        }
531
        return 0;
1,113✔
532
}
533

534
static struct space *
535
memtx_engine_create_space(struct engine *engine, struct space_def *def,
547,203✔
536
                          struct rlist *key_list)
537
{
538
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
547,203✔
539
        return memtx_space_new(memtx, def, key_list);
547,203✔
540
}
541

542
/** Engine read view implementation. */
543
struct memtx_read_view {
544
        /** Base class. */
545
        struct engine_read_view base;
546
        /**
547
         * Allocator read view that prevents tuples referenced by index read
548
         * views from being freed.
549
         */
550
        memtx_allocators_read_view allocators_rv;
551
};
552

553
static void
554
memtx_engine_read_view_free(struct engine_read_view *base)
4,199✔
555
{
556
        struct memtx_read_view *rv = (struct memtx_read_view *)base;
4,199✔
557
        memtx_allocators_close_read_view(rv->allocators_rv);
4,199✔
558
        free(rv);
4,199✔
559
}
4,199✔
560

561
/** Implementation of create_read_view engine callback. */
562
static struct engine_read_view *
563
memtx_engine_create_read_view(struct engine *engine,
4,199✔
564
                              const struct read_view_opts *opts)
565
{
566
        static const struct engine_read_view_vtab vtab = {
567
                .free = memtx_engine_read_view_free,
568
        };
569
        (void)engine;
570
        struct memtx_read_view *rv =
571
                (struct memtx_read_view *)xmalloc(sizeof(*rv));
4,199!
572
        rv->base.vtab = &vtab;
4,199✔
573
        rv->allocators_rv = memtx_allocators_open_read_view(opts);
4,199✔
574
        return (struct engine_read_view *)rv;
4,199✔
575
}
576

577
static int
578
memtx_engine_begin(struct engine *engine, struct txn *txn)
5,458,060✔
579
{
580
        (void)engine;
581
        txn_can_yield(txn, memtx_tx_manager_use_mvcc_engine);
5,458,060✔
582
        return 0;
5,458,060✔
583
}
584

585
static int
586
memtx_engine_prepare(struct engine *engine, struct txn *txn)
5,444,120✔
587
{
588
        (void)engine;
589
        if (memtx_tx_manager_use_mvcc_engine) {
5,444,120✔
590
                struct txn_stmt *stmt;
591
                stailq_foreach_entry(stmt, &txn->stmts, next) {
164,153✔
592
                        assert(stmt->space->engine == engine);
85,365!
593
                        memtx_tx_history_prepare_stmt(stmt);
85,365✔
594
                }
595
                memtx_tx_prepare_finalize(txn);
78,788✔
596
        }
597
        if (txn->is_schema_changed)
5,444,120✔
598
                memtx_tx_abort_all_for_ddl(txn);
1,575,460✔
599
        return 0;
5,444,120✔
600
}
601

602
static void
603
memtx_engine_commit(struct engine *engine, struct txn *txn)
5,443,840✔
604
{
605
        (void)engine;
606
        struct txn_stmt *stmt;
607
        stailq_foreach_entry(stmt, &txn->stmts, next) {
18,361,100✔
608
                if (memtx_tx_manager_use_mvcc_engine) {
12,917,300✔
609
                        assert(stmt->space->engine == engine);
85,354!
610
                        struct memtx_space *mspace =
85,354✔
611
                                (struct memtx_space *)stmt->space;
612
                        size_t *bsize = &mspace->bsize;
85,354✔
613
                        memtx_tx_history_commit_stmt(stmt, bsize);
85,354✔
614
                }
615
                if (stmt->engine_savepoint != NULL) {
12,917,300✔
616
                        struct space *space = stmt->space;
12,602,800✔
617
                        struct tuple *old_tuple = stmt->rollback_info.old_tuple;
12,602,800✔
618
                        if (space->upgrade != NULL && old_tuple != NULL)
12,602,800!
619
                                memtx_space_upgrade_untrack_tuple(
×
620
                                                space->upgrade, old_tuple);
621
                }
622
        }
623
}
5,443,840✔
624

625
static void
626
memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
31,810✔
627
                                struct txn_stmt *stmt)
628
{
629
        (void)engine;
630
        (void)txn;
631
        struct tuple *old_tuple = stmt->rollback_info.old_tuple;
31,810✔
632
        struct tuple *new_tuple = stmt->rollback_info.new_tuple;
31,810✔
633
        if (old_tuple == NULL && new_tuple == NULL)
31,810✔
634
                return;
10,686✔
635
        struct space *space = stmt->space;
21,124✔
636
        if (space == NULL) {
21,124!
637
                /* The space was deleted. Nothing to rollback. */
638
                return;
×
639
        }
640
        struct memtx_space *memtx_space = (struct memtx_space *)space;
21,124✔
641
        uint32_t index_count;
642

643
        /* Only roll back the changes if they were made. */
644
        if (stmt->engine_savepoint == NULL)
21,124✔
645
                return;
23✔
646

647
        if (space->upgrade != NULL && new_tuple != NULL)
21,101!
648
                memtx_space_upgrade_untrack_tuple(space->upgrade, new_tuple);
×
649

650
        if (memtx_tx_manager_use_mvcc_engine)
21,101✔
651
                return memtx_tx_history_rollback_stmt(stmt);
11,686✔
652

653
        if (memtx_space->replace == memtx_space_replace_all_keys)
9,415✔
654
                index_count = space->index_count;
9,399✔
655
        else if (memtx_space->replace == memtx_space_replace_primary_key)
16✔
656
                index_count = 1;
15✔
657
        else
658
                panic("transaction rolled back during snapshot recovery");
1!
659

660
        for (uint32_t i = 0; i < index_count; i++) {
23,873✔
661
                struct tuple *unused;
662
                struct index *index = space->index[i];
14,459✔
663
                /* Rollback must not fail. */
664
                if (index_replace(index, new_tuple, old_tuple,
14,459!
665
                                  DUP_INSERT, &unused, &unused) != 0) {
14,459!
666
                        diag_log();
×
667
                        unreachable();
×
668
                        panic("failed to rollback change");
669
                }
670
        }
671

672
        memtx_space_update_bsize(space, new_tuple, old_tuple);
9,414✔
673
        if (old_tuple != NULL)
9,414✔
674
                tuple_ref(old_tuple);
5,048✔
675
        if (new_tuple != NULL)
9,414✔
676
                tuple_unref(new_tuple);
7,289✔
677
}
678

679
static int
680
memtx_engine_bootstrap(struct engine *engine)
2,105✔
681
{
682
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
2,105✔
683

684
        assert(memtx->state == MEMTX_INITIALIZED);
2,105!
685
        memtx->state = MEMTX_OK;
2,105✔
686

687
        /* Recover from bootstrap.snap */
688
        say_info("initializing an empty data directory");
2,105!
689
        struct xlog_cursor cursor;
690
        if (xlog_cursor_openmem(&cursor, (const char *)bootstrap_bin,
2,105!
691
                                sizeof(bootstrap_bin), "bootstrap") < 0)
2,105!
692
                return -1;
×
693

694
        int rc;
695
        struct xrow_header row;
696
        enum snapshot_recovery_state state = SNAPSHOT_RECOVERY_NOT_STARTED;
2,105✔
697
        while ((rc = xlog_cursor_next(&cursor, &row, true)) == 0) {
814,635!
698
                rc = memtx_engine_recover_snapshot_row(memtx, &row, &state);
812,530!
699
                if (rc < 0)
812,530!
700
                        break;
×
701
        }
702
        xlog_cursor_close(&cursor, false);
2,105!
703
        if (rc < 0)
2,105!
704
                return -1;
×
705
        memtx->on_indexes_built_cb();
2,105!
706
        return 0;
2,105✔
707
}
708

709
static int
710
checkpoint_write_row(struct xlog *l, struct xrow_header *row)
1,894,310✔
711
{
712
        static ev_tstamp last = 0;
713
        if (last == 0) {
1,894,310✔
714
                ev_now_update(loop());
2,683!
715
                last = ev_now(loop());
2,683!
716
        }
717

718
        row->tm = last;
1,894,310✔
719
        row->replica_id = 0;
1,894,310✔
720
        /**
721
         * Rows in snapshot are numbered from 1 to %rows.
722
         * This makes streaming such rows to a replica or
723
         * to recovery look similar to streaming a normal
724
         * WAL. @sa the place which skips old rows in
725
         * recovery_apply_row().
726
         */
727
        row->lsn = l->rows + l->tx_rows;
1,894,310✔
728
        row->sync = 0; /* don't write sync to wal */
1,894,310✔
729

730
        if (xlog_write_row(l, row) < 0)
1,894,310!
731
                return -1;
×
732

733
        if ((l->rows + l->tx_rows) % 100000 == 0) {
1,894,310✔
734
                say_info_ratelimited("%.1fM rows written",
4!
735
                                     (l->rows + l->tx_rows) / 1e6);
736
        }
737
        return 0;
1,894,310✔
738

739
}
740

741
static int
742
checkpoint_write_tuple(struct xlog *l, uint32_t space_id, uint32_t group_id,
1,887,310✔
743
                       const char *data, uint32_t size)
744
{
745
        struct request_replace_body body;
746
        request_replace_body_create(&body, space_id);
1,887,310✔
747

748
        struct xrow_header row;
749
        memset(&row, 0, sizeof(struct xrow_header));
1,887,310✔
750
        row.type = IPROTO_INSERT;
1,887,310✔
751
        row.group_id = group_id;
1,887,310✔
752

753
        row.bodycnt = 2;
1,887,310✔
754
        row.body[0].iov_base = &body;
1,887,310✔
755
        row.body[0].iov_len = sizeof(body);
1,887,310✔
756
        row.body[1].iov_base = (char *)data;
1,887,310✔
757
        row.body[1].iov_len = size;
1,887,310✔
758
        return checkpoint_write_row(l, &row);
3,774,620!
759
}
760

761
struct checkpoint {
762
        /** Database read view written to the snapshot file. */
763
        struct read_view rv;
764
        struct cord cord;
765
        bool waiting_for_snap_thread;
766
        /** The vclock of the snapshot file. */
767
        struct vclock vclock;
768
        struct xdir dir;
769
        struct raft_request raft;
770
        struct synchro_request synchro_state;
771
        /** The limbo confirmed vclock at the moment of checkpoint creation. */
772
        struct vclock synchro_vclock;
773
        /**
774
         * Do nothing, just touch the snapshot file - the
775
         * checkpoint already exists.
776
         */
777
        bool touch;
778
};
779

780
/** Space filter for checkpoint. */
781
static bool
782
checkpoint_space_filter(struct space *space, void *arg)
62,625✔
783
{
784
        (void)arg;
785
        return space_is_memtx(space) &&
125,250!
786
                space_index(space, 0) != NULL;
125,250✔
787
}
788

789
/**
790
 * In case of checkpoint and replica join, we're only interested in tuples
791
 * stored in space so we don't need to include secondary indexes into the read
792
 * view. This function filters out all secondary indexes.
793
 */
794
static bool
795
primary_index_filter(struct space *space, struct index *index, void *arg)
141,560✔
796
{
797
        (void)space;
798
        (void)arg;
799
        return index->def->iid == 0;
141,560✔
800
}
801

802
static struct checkpoint *
803
checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
3,742✔
804
{
805
        struct checkpoint *ckpt = (struct checkpoint *)malloc(sizeof(*ckpt));
3,742✔
806
        if (ckpt == NULL) {
3,742!
807
                diag_set(OutOfMemory, sizeof(*ckpt), "malloc",
×
808
                         "struct checkpoint");
809
                return NULL;
×
810
        }
811
        struct read_view_opts rv_opts;
812
        read_view_opts_create(&rv_opts);
3,742!
813
        rv_opts.name = "checkpoint";
3,742✔
814
        rv_opts.is_system = true;
3,742✔
815
        rv_opts.filter_space = checkpoint_space_filter;
3,742✔
816
        rv_opts.filter_index = primary_index_filter;
3,742✔
817
        if (read_view_open(&ckpt->rv, &rv_opts) != 0) {
3,742!
818
                free(ckpt);
×
819
                return NULL;
×
820
        }
821
        ckpt->waiting_for_snap_thread = false;
3,742✔
822
        struct xlog_opts opts = xlog_opts_default;
3,742✔
823
        opts.rate_limit = snap_io_rate_limit;
3,742✔
824
        opts.sync_interval = SNAP_SYNC_INTERVAL;
3,742✔
825
        opts.free_cache = true;
3,742✔
826
        xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
3,742!
827
        vclock_create(&ckpt->vclock);
3,742✔
828
        box_raft_checkpoint_local(&ckpt->raft);
3,742!
829
        txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state,
3,742!
830
                             &ckpt->synchro_vclock);
831
        ckpt->touch = false;
3,742✔
832
        return ckpt;
3,742✔
833
}
834

835
static void
836
checkpoint_delete(struct checkpoint *ckpt)
3,742✔
837
{
838
        read_view_close(&ckpt->rv);
3,742✔
839
        xdir_destroy(&ckpt->dir);
3,742✔
840
        free(ckpt);
3,742✔
841
}
3,742✔
842

843
static void
844
checkpoint_cancel(struct checkpoint *ckpt)
7✔
845
{
846
        /*
847
         * Cancel the checkpoint thread if it's running and wait
848
         * for it to terminate so as to eliminate the possibility
849
         * of use-after-free.
850
         */
851
        if (ckpt->waiting_for_snap_thread)
7✔
852
                cord_cancel_and_join(&ckpt->cord);
6✔
853
        checkpoint_delete(ckpt);
7✔
854
}
7✔
855

856
static void
857
replica_join_cancel(struct cord *replica_join_cord)
×
858
{
859
        /*
860
         * Cancel the thread being used to join replica if it's
861
         * running and wait for it to terminate so as to
862
         * eliminate the possibility of use-after-free.
863
         */
864
        cord_cancel_and_join(replica_join_cord);
×
865
}
×
866

867
static int
868
checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
3,499✔
869
{
870
        struct xrow_header row;
871
        struct region *region = &fiber()->gc;
3,499!
872
        RegionGuard region_guard(&fiber()->gc);
6,998!
873
        xrow_encode_raft(&row, region, req);
3,499!
874
        if (checkpoint_write_row(l, &row) != 0)
3,499!
875
                return -1;
×
876
        return 0;
3,499✔
877
}
878

879
static int
880
checkpoint_write_synchro(struct xlog *l, const struct synchro_request *req)
3,499✔
881
{
882
        struct xrow_header row;
883
        char body[XROW_BODY_LEN_MAX];
884
        xrow_encode_synchro(&row, body, req);
3,499!
885
        return checkpoint_write_row(l, &row);
6,998!
886
}
887

888
#ifndef NDEBUG
889
/*
890
 * The functions defined below are used in tests to write a corrupted
891
 * snapshot file.
892
 */
893

894
/** Writes an INSERT row with a corrupted body to the snapshot. */
895
static int
896
checkpoint_write_corrupted_insert_row(struct xlog *l)
2✔
897
{
898
        /* Sic: Array of ten elements but only one is encoded. */
899
        char buf[8];
900
        char *p = mp_encode_array(buf, 10);
2✔
901
        p = mp_encode_uint(p, 0);
2✔
902
        assert((size_t)(p - buf) <= sizeof(buf));
2!
903
        return checkpoint_write_tuple(l, /*space_id=*/512, GROUP_DEFAULT,
4✔
904
                                      buf, p - buf);
4!
905
}
906

907
/** Writes an INSERT row for a missing space to the snapshot. */
908
static int
909
checkpoint_write_missing_space_row(struct xlog *l)
1✔
910
{
911
        char buf[8];
912
        char *p = mp_encode_array(buf, 1);
1✔
913
        p = mp_encode_uint(p, 0);
1✔
914
        assert((size_t)(p - buf) <= sizeof(buf));
1!
915
        return checkpoint_write_tuple(l, /*space_id=*/777, GROUP_DEFAULT,
2✔
916
                                      buf, p - buf);
2!
917
}
918

919
/** Writes a row with an unknown type to the snapshot. */
920
static int
921
checkpoint_write_unknown_row_type(struct xlog *l)
1✔
922
{
923
        struct xrow_header row;
924
        memset(&row, 0, sizeof(row));
1✔
925
        row.type = 777;
1✔
926
        row.bodycnt = 1;
1✔
927
        char buf[8];
928
        char *p = mp_encode_map(buf, 0);
1✔
929
        assert((size_t)(p - buf) <= sizeof(buf));
1!
930
        row.body[0].iov_base = buf;
1✔
931
        row.body[0].iov_len = p - buf;
1✔
932
        return checkpoint_write_row(l, &row);
2!
933
}
934

935
/** Writes an invalid system row (not INSERT) to the snapshot. */
936
static int
937
checkpoint_write_invalid_system_row(struct xlog *l)
1✔
938
{
939
        struct xrow_header row;
940
        memset(&row, 0, sizeof(row));
1✔
941
        row.type = IPROTO_RAFT;
1✔
942
        row.group_id = GROUP_LOCAL;
1✔
943
        row.bodycnt = 1;
1✔
944
        char buf[8];
945
        char *p = mp_encode_map(buf, 1);
1✔
946
        p = mp_encode_uint(p, IPROTO_RAFT_TERM);
1✔
947
        p = mp_encode_nil(p);
1✔
948
        assert((size_t)(p - buf) <= sizeof(buf));
1!
949
        row.body[0].iov_base = buf;
1✔
950
        row.body[0].iov_len = p - buf;
1✔
951
        return checkpoint_write_row(l, &row);
2!
952
}
953
#endif /* NDEBUG */
954

955
static int
956
checkpoint_f(va_list ap)
3,737✔
957
{
958
        int rc = 0;
3,737✔
959
        struct checkpoint *ckpt = va_arg(ap, struct checkpoint *);
3,737✔
960

961
        if (ckpt->touch) {
3,737✔
962
                if (xdir_touch_xlog(&ckpt->dir, &ckpt->vclock) == 0)
234!
963
                        return 0;
233✔
964
                /*
965
                 * Failed to touch an existing snapshot, create
966
                 * a new one.
967
                 */
968
                ckpt->touch = false;
1✔
969
        }
970

971
        struct xlog snap;
972
        if (xdir_create_xlog(&ckpt->dir, &snap, &ckpt->vclock) != 0)
3,504!
973
                return -1;
1✔
974

975
        say_info("saving snapshot `%s'", snap.filename);
3,503!
976
        ERROR_INJECT_SLEEP(ERRINJ_SNAP_WRITE_DELAY);
5,533!
977
        ERROR_INJECT(ERRINJ_SNAP_SKIP_ALL_ROWS, goto done);
3,500!
978
        struct space_read_view *space_rv;
979
        read_view_foreach_space(space_rv, &ckpt->rv) {
124,414✔
980
                FiberGCChecker gc_check;
×
981
                bool skip = false;
58,708✔
982
                ERROR_INJECT(ERRINJ_SNAP_SKIP_DDL_ROWS, {
58,708!
983
                        skip = space_id_is_system(space_rv->id);
984
                });
985
                if (skip)
58,708✔
986
                        continue;
16✔
987
                struct index_read_view *index_rv =
988
                        space_read_view_index(space_rv, 0);
58,692✔
989
                assert(index_rv != NULL);
58,692!
990
                struct index_read_view_iterator it;
991
                if (index_read_view_create_iterator(index_rv, ITER_ALL,
58,692!
992
                                                    NULL, 0, &it) != 0) {
58,692!
993
                        rc = -1;
×
994
                        break;
×
995
                }
996
                while (true) {
997
                        RegionGuard region_guard(&fiber()->gc);
1,946,000!
998
                        struct read_view_tuple result;
999
                        rc = index_read_view_iterator_next_raw(&it, &result);
1,946,000!
1000
                        if (rc != 0 || result.data == NULL)
1,946,000!
1001
                                break;
1002
                        rc = checkpoint_write_tuple(&snap, space_rv->id,
1,887,310!
1003
                                                    space_rv->group_id,
1004
                                                    result.data, result.size);
1005
                        if (rc != 0)
1,887,310!
1006
                                break;
×
1007
                }
1,887,310✔
1008
                index_read_view_iterator_destroy(&it);
58,692✔
1009
                if (rc != 0)
58,692!
1010
                        break;
×
1011
        }
1012
        if (rc != 0)
3,499!
1013
                goto fail;
×
1014
        ERROR_INJECT(ERRINJ_SNAP_WRITE_CORRUPTED_INSERT_ROW, {
3,499!
1015
                if (checkpoint_write_corrupted_insert_row(&snap) != 0)
1016
                        goto fail;
1017
        });
1018
        ERROR_INJECT(ERRINJ_SNAP_WRITE_MISSING_SPACE_ROW, {
3,499!
1019
                if (checkpoint_write_missing_space_row(&snap) != 0)
1020
                        goto fail;
1021
        });
1022
        ERROR_INJECT(ERRINJ_SNAP_WRITE_UNKNOWN_ROW_TYPE, {
3,499!
1023
                if (checkpoint_write_unknown_row_type(&snap) != 0)
1024
                        goto fail;
1025
        });
1026
        ERROR_INJECT(ERRINJ_SNAP_WRITE_INVALID_SYSTEM_ROW, {
3,499!
1027
                if (checkpoint_write_invalid_system_row(&snap) != 0)
1028
                        goto fail;
1029
        });
1030
        if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
3,499!
1031
                goto fail;
×
1032
        if (checkpoint_write_synchro(&snap, &ckpt->synchro_state) != 0)
3,499!
1033
                goto fail;
×
1034
        goto done;
3,499✔
1035
done:
3,500✔
1036
        if (xlog_flush(&snap) < 0)
3,500✔
1037
                goto fail;
2✔
1038

1039
        xlog_close(&snap, false);
3,497✔
1040
        say_info("done");
3,496!
1041
        return 0;
3,496✔
1042
fail:
2✔
1043
        xlog_close(&snap, false);
2!
1044
        return -1;
2✔
1045
}
1046

1047
static int
1048
memtx_engine_begin_checkpoint(struct engine *engine, bool is_scheduled)
3,742✔
1049
{
1050
        (void) is_scheduled;
1051
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
3,742✔
1052

1053
        assert(memtx->checkpoint == NULL);
3,742!
1054
        memtx->checkpoint = checkpoint_new(memtx->snap_dir.dirname,
3,742✔
1055
                                           memtx->snap_io_rate_limit);
1056
        if (memtx->checkpoint == NULL)
3,742!
1057
                return -1;
×
1058
        return 0;
3,742✔
1059
}
1060

1061
static int
1062
memtx_engine_wait_checkpoint(struct engine *engine,
3,737✔
1063
                             const struct vclock *vclock)
1064
{
1065
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
3,737✔
1066

1067
        assert(memtx->checkpoint != NULL);
3,737!
1068
        /*
1069
         * If a snapshot already exists, do not create a new one.
1070
         */
1071
        struct vclock last;
1072
        if (xdir_last_vclock(&memtx->snap_dir, &last) >= 0 &&
4,931!
1073
            vclock_compare(&last, vclock) == 0) {
1,194✔
1074
                memtx->checkpoint->touch = true;
234✔
1075
        }
1076
        vclock_copy(&memtx->checkpoint->vclock, vclock);
3,737✔
1077

1078
        if (cord_costart(&memtx->checkpoint->cord, "snapshot",
7,474✔
1079
                         checkpoint_f, memtx->checkpoint)) {
3,737!
1080
                return -1;
×
1081
        }
1082
        memtx->checkpoint->waiting_for_snap_thread = true;
3,737✔
1083

1084
        /* wait for memtx-part snapshot completion */
1085
        int result = cord_cojoin(&memtx->checkpoint->cord);
3,737!
1086
        if (result != 0)
3,731✔
1087
                diag_log();
3!
1088

1089
        memtx->checkpoint->waiting_for_snap_thread = false;
3,731✔
1090
        return result;
3,731✔
1091
}
1092

1093
static void
1094
memtx_engine_commit_checkpoint(struct engine *engine,
3,716✔
1095
                               const struct vclock *vclock)
1096
{
1097
        ERROR_INJECT_TERMINATE(ERRINJ_SNAP_COMMIT_FAIL);
3,716!
1098
        (void) vclock;
1099
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
3,716✔
1100

1101
        /* beginCheckpoint() must have been done */
1102
        assert(memtx->checkpoint != NULL);
3,716!
1103
        /* waitCheckpoint() must have been done. */
1104
        assert(!memtx->checkpoint->waiting_for_snap_thread);
3,716!
1105

1106
        if (!memtx->checkpoint->touch) {
3,716✔
1107
                int64_t lsn = vclock_sum(&memtx->checkpoint->vclock);
3,484✔
1108
                struct xdir *dir = &memtx->checkpoint->dir;
3,484✔
1109
                /* rename snapshot on completion */
1110
                char to[PATH_MAX];
1111
                snprintf(to, sizeof(to), "%s",
3,484!
1112
                         xdir_format_filename(dir, lsn, NONE));
1113
                const char *from = xdir_format_filename(dir, lsn, INPROGRESS);
3,484!
1114
                ERROR_INJECT_YIELD(ERRINJ_SNAP_COMMIT_DELAY);
3,484!
1115
                int rc = coio_rename(from, to);
3,484!
1116
                if (rc != 0)
3,484!
1117
                        panic("can't rename .snap.inprogress");
×
1118
        }
1119

1120
        struct vclock last;
1121
        if (xdir_last_vclock(&memtx->snap_dir, &last) < 0 ||
4,891!
1122
            vclock_compare(&last, vclock) != 0) {
1,175✔
1123
                /* Add the new checkpoint to the set. */
1124
                xdir_add_vclock(&memtx->snap_dir, &memtx->checkpoint->vclock);
3,483!
1125
        }
1126

1127
        checkpoint_delete(memtx->checkpoint);
3,716!
1128
        memtx->checkpoint = NULL;
3,716✔
1129
}
3,716✔
1130

1131
static void
1132
memtx_engine_abort_checkpoint(struct engine *engine)
19✔
1133
{
1134
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
19✔
1135

1136
        /**
1137
         * An error in the other engine's first phase.
1138
         */
1139
        if (memtx->checkpoint->waiting_for_snap_thread) {
19!
1140
                /* wait for memtx-part snapshot completion */
1141
                if (cord_cojoin(&memtx->checkpoint->cord) != 0)
×
1142
                        diag_log();
×
1143
                memtx->checkpoint->waiting_for_snap_thread = false;
×
1144
        }
1145

1146
        /** Remove garbage .inprogress file. */
1147
        const char *filename =
1148
                xdir_format_filename(&memtx->checkpoint->dir,
19✔
1149
                                     vclock_sum(&memtx->checkpoint->vclock),
19✔
1150
                                     INPROGRESS);
1151
        (void) coio_unlink(filename);
19✔
1152

1153
        checkpoint_delete(memtx->checkpoint);
19✔
1154
        memtx->checkpoint = NULL;
19✔
1155
}
19✔
1156

1157
static void
1158
memtx_engine_collect_garbage(struct engine *engine, const struct vclock *vclock)
734✔
1159
{
1160
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
734✔
1161
        xdir_collect_garbage(&memtx->snap_dir, vclock_sum(vclock),
734✔
1162
                             XDIR_GC_ASYNC);
1163
}
734✔
1164

1165
static int
1166
memtx_engine_backup(struct engine *engine, const struct vclock *vclock,
8✔
1167
                    engine_backup_cb cb, void *cb_arg)
1168
{
1169
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
8✔
1170
        const char *filename = xdir_format_filename(&memtx->snap_dir,
8✔
1171
                                                    vclock_sum(vclock), NONE);
1172
        return cb(filename, cb_arg);
8✔
1173
}
1174

1175
struct memtx_join_ctx {
1176
        /** Database read view sent to the replica. */
1177
        struct read_view rv;
1178
        struct xstream *stream;
1179
};
1180

1181
/** Space filter for replica join. */
1182
static bool
1183
memtx_join_space_filter(struct space *space, void *arg)
7,494✔
1184
{
1185
        (void)arg;
1186
        return space_is_memtx(space) &&
7,494✔
1187
               !space_is_local(space) &&
14,979!
1188
               space_index(space, 0) != NULL;
14,979✔
1189
}
1190

1191
static int
1192
memtx_engine_prepare_join(struct engine *engine, void **arg)
457✔
1193
{
1194
        (void)engine;
1195
        struct memtx_join_ctx *ctx =
1196
                (struct memtx_join_ctx *)malloc(sizeof(*ctx));
457✔
1197
        if (ctx == NULL) {
457!
1198
                diag_set(OutOfMemory, sizeof(*ctx),
×
1199
                         "malloc", "struct memtx_join_ctx");
1200
                return -1;
×
1201
        }
1202
        struct read_view_opts rv_opts;
1203
        read_view_opts_create(&rv_opts);
457!
1204
        rv_opts.name = "join";
457✔
1205
        rv_opts.is_system = true;
457✔
1206
        rv_opts.filter_space = memtx_join_space_filter;
457✔
1207
        rv_opts.filter_index = primary_index_filter;
457✔
1208
        if (read_view_open(&ctx->rv, &rv_opts) != 0) {
457!
1209
                free(ctx);
×
1210
                return -1;
×
1211
        }
1212
        *arg = ctx;
457✔
1213
        return 0;
457✔
1214
}
1215

1216
static int
1217
memtx_join_send_tuple(struct xstream *stream, uint32_t space_id,
190,362✔
1218
                      const char *data, size_t size)
1219
{
1220
        struct request_replace_body body;
1221
        request_replace_body_create(&body, space_id);
190,362✔
1222

1223
        struct xrow_header row;
1224
        memset(&row, 0, sizeof(row));
190,362✔
1225
        row.type = IPROTO_INSERT;
190,362✔
1226

1227
        row.bodycnt = 2;
190,362✔
1228
        row.body[0].iov_base = &body;
190,362✔
1229
        row.body[0].iov_len = sizeof(body);
190,362✔
1230
        row.body[1].iov_base = (char *)data;
190,362✔
1231
        row.body[1].iov_len = size;
190,362✔
1232

1233
        return xstream_write(stream, &row);
380,724!
1234
}
1235

1236
static int
1237
memtx_join_f(va_list ap)
455✔
1238
{
1239
        int rc = 0;
455✔
1240
        struct memtx_join_ctx *ctx = va_arg(ap, struct memtx_join_ctx *);
455✔
1241
        struct space_read_view *space_rv;
1242
        read_view_foreach_space(space_rv, &ctx->rv) {
15,808✔
1243
                FiberGCChecker gc_check;
×
1244
                struct index_read_view *index_rv =
1245
                        space_read_view_index(space_rv, 0);
7,449✔
1246
                assert(index_rv != NULL);
7,449!
1247
                struct index_read_view_iterator it;
1248
                if (index_read_view_create_iterator(index_rv, ITER_ALL,
7,449!
1249
                                                    NULL, 0, &it) != 0) {
7,449!
1250
                        rc = -1;
×
1251
                        break;
×
1252
                }
1253
                while (true) {
1254
                        RegionGuard region_guard(&fiber()->gc);
197,811!
1255
                        struct read_view_tuple result;
1256
                        rc = index_read_view_iterator_next_raw(&it, &result);
197,811!
1257
                        if (rc != 0 || result.data == NULL)
197,811!
1258
                                break;
1259
                        rc = memtx_join_send_tuple(ctx->stream, space_rv->id,
380,724✔
1260
                                                   result.data, result.size);
190,362!
1261
                        if (rc != 0)
190,362!
1262
                                break;
×
1263
                }
190,362✔
1264
                index_read_view_iterator_destroy(&it);
7,449✔
1265
                if (rc != 0)
7,449!
1266
                        break;
×
1267
        }
1268
        return rc;
455✔
1269
}
1270

1271
static int
1272
memtx_engine_join(struct engine *engine, void *arg, struct xstream *stream)
455✔
1273
{
1274
        (void)engine;
1275
        struct memtx_join_ctx *ctx = (struct memtx_join_ctx *)arg;
455✔
1276
        ctx->stream = stream;
455✔
1277
        /*
1278
         * Memtx snapshot iterators are safe to use from another
1279
         * thread and so we do so as not to consume too much of
1280
         * precious tx cpu time while a new replica is joining.
1281
         */
1282
        struct cord cord;
1283
        if (cord_costart(&cord, "initial_join", memtx_join_f, ctx) != 0)
455!
1284
                return -1;
×
1285
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
455✔
1286
        memtx->replica_join_cord = &cord;
455✔
1287
        int res = cord_cojoin(&cord);
455!
1288
        memtx->replica_join_cord = NULL;
455✔
1289
        xstream_reset(stream);
455!
1290
        return res;
455✔
1291
}
1292

1293
static void
1294
memtx_engine_complete_join(struct engine *engine, void *arg)
457✔
1295
{
1296
        (void)engine;
1297
        struct memtx_join_ctx *ctx = (struct memtx_join_ctx *)arg;
457✔
1298
        read_view_close(&ctx->rv);
457✔
1299
        free(ctx);
457✔
1300
}
457✔
1301

1302
static void
1303
memtx_engine_memory_stat(struct engine *engine, struct engine_memory_stat *stat)
73✔
1304
{
1305
        struct memtx_engine *memtx = (struct memtx_engine *)engine;
73✔
1306
        struct allocator_stats data_stats;
1307
        struct mempool_stats index_stats;
1308
        mempool_stats(&memtx->index_extent_pool, &index_stats);
73!
1309
        memset(&data_stats, 0, sizeof(data_stats));
73✔
1310
        allocators_stats(&data_stats);
73!
1311
        stat->data += data_stats.small.used;
73✔
1312
        stat->data += data_stats.sys.used;
73✔
1313
        stat->index += index_stats.totals.used;
73✔
1314
}
73✔
1315

1316
static const struct engine_vtab memtx_engine_vtab = {
1317
        /* .shutdown = */ memtx_engine_shutdown,
1318
        /* .create_space = */ memtx_engine_create_space,
1319
        /* .create_read_view = */ memtx_engine_create_read_view,
1320
        /* .prepare_join = */ memtx_engine_prepare_join,
1321
        /* .join = */ memtx_engine_join,
1322
        /* .complete_join = */ memtx_engine_complete_join,
1323
        /* .begin = */ memtx_engine_begin,
1324
        /* .begin_statement = */ generic_engine_begin_statement,
1325
        /* .prepare = */ memtx_engine_prepare,
1326
        /* .commit = */ memtx_engine_commit,
1327
        /* .rollback_statement = */ memtx_engine_rollback_statement,
1328
        /* .rollback = */ generic_engine_rollback,
1329
        /* .switch_to_ro = */ generic_engine_switch_to_ro,
1330
        /* .bootstrap = */ memtx_engine_bootstrap,
1331
        /* .begin_initial_recovery = */ memtx_engine_begin_initial_recovery,
1332
        /* .begin_final_recovery = */ memtx_engine_begin_final_recovery,
1333
        /* .begin_hot_standby = */ memtx_engine_begin_hot_standby,
1334
        /* .end_recovery = */ memtx_engine_end_recovery,
1335
        /* .begin_checkpoint = */ memtx_engine_begin_checkpoint,
1336
        /* .wait_checkpoint = */ memtx_engine_wait_checkpoint,
1337
        /* .commit_checkpoint = */ memtx_engine_commit_checkpoint,
1338
        /* .abort_checkpoint = */ memtx_engine_abort_checkpoint,
1339
        /* .collect_garbage = */ memtx_engine_collect_garbage,
1340
        /* .backup = */ memtx_engine_backup,
1341
        /* .memory_stat = */ memtx_engine_memory_stat,
1342
        /* .reset_stat = */ generic_engine_reset_stat,
1343
        /* .check_space_def = */ generic_engine_check_space_def,
1344
};
1345

1346
/**
1347
 * Run one iteration of garbage collection. Set @stop if
1348
 * there is no more objects to free.
1349
 */
1350
static void
1351
memtx_engine_run_gc(struct memtx_engine *memtx, bool *stop)
2,149,210✔
1352
{
1353
        *stop = stailq_empty(&memtx->gc_queue);
2,149,210✔
1354
        if (*stop)
2,149,210✔
1355
                return;
17,123✔
1356

1357
        struct memtx_gc_task *task = stailq_first_entry(&memtx->gc_queue,
2,132,080✔
1358
                                        struct memtx_gc_task, link);
1359
        bool task_done;
1360
        task->vtab->run(task, &task_done);
2,132,080!
1361
        if (task_done) {
2,132,080✔
1362
                stailq_shift(&memtx->gc_queue);
365,899✔
1363
                task->vtab->free(task);
365,899!
1364
        }
1365
}
1366

1367
static int
1368
memtx_engine_gc_f(va_list va)
3,260✔
1369
{
1370
        struct memtx_engine *memtx = va_arg(va, struct memtx_engine *);
3,260✔
1371
        while (!fiber_is_cancelled()) {
1,955,590!
1372
                FiberGCChecker gc_check;
×
1373
                bool stop;
1374
                ERROR_INJECT_YIELD(ERRINJ_MEMTX_DELAY_GC);
1,955,590!
1375
                memtx_engine_run_gc(memtx, &stop);
1,955,590!
1376
                if (stop) {
1,955,590✔
1377
                        fiber_yield_timeout(TIMEOUT_INFINITY);
17,116!
1378
                        continue;
14,014✔
1379
                }
1380
                /*
1381
                 * Yield after each iteration so as not to block
1382
                 * tx thread for too long.
1383
                 */
1384
                fiber_sleep(0);
1,938,480✔
1385
        }
1386
        return 0;
×
1387
}
1388

1389
void
1390
memtx_set_tuple_format_vtab(const char *allocator_name)
3,260✔
1391
{
1392
        if (strncmp(allocator_name, "small", strlen("small")) == 0) {
3,260!
1393
                memtx_alloc_init<SmallAlloc>();
3,260✔
1394
                create_memtx_tuple_format_vtab<SmallAlloc>
1395
                        (&memtx_tuple_format_vtab);
3,260✔
1396
        } else if (strncmp(allocator_name, "system", strlen("system")) == 0) {
×
1397
                memtx_alloc_init<SysAlloc>();
×
1398
                create_memtx_tuple_format_vtab<SysAlloc>
1399
                        (&memtx_tuple_format_vtab);
×
1400
        } else {
1401
                unreachable();
×
1402
        }
1403
}
3,260✔
1404

1405
int
1406
memtx_tuple_validate(struct tuple_format *format, struct tuple *tuple)
17,314,000✔
1407
{
1408
        tuple = memtx_tuple_decompress(tuple);
17,314,000✔
1409
        if (tuple == NULL)
17,314,000!
1410
                return -1;
×
1411
        tuple_ref(tuple);
17,314,000✔
1412
        int rc = tuple_validate_raw(format, tuple_data(tuple));
17,314,000✔
1413
        tuple_unref(tuple);
17,314,000✔
1414
        return rc;
17,314,000✔
1415
}
1416

1417
struct memtx_engine *
1418
memtx_engine_new(const char *snap_dirname, bool force_recovery,
3,261✔
1419
                 uint64_t tuple_arena_max_size, uint32_t objsize_min,
1420
                 bool dontdump, unsigned granularity,
1421
                 const char *allocator, float alloc_factor,
1422
                 memtx_on_indexes_built_cb on_indexes_built)
1423
{
1424
        int64_t snap_signature;
1425
        struct memtx_engine *memtx =
1426
                (struct memtx_engine *)calloc(1, sizeof(*memtx));
3,261✔
1427
        if (memtx == NULL) {
3,261!
1428
                diag_set(OutOfMemory, sizeof(*memtx),
×
1429
                         "malloc", "struct memtx_engine");
1430
                return NULL;
×
1431
        }
1432

1433
        xdir_create(&memtx->snap_dir, snap_dirname, SNAP, &INSTANCE_UUID,
3,261!
1434
                    &xlog_opts_default);
1435
        memtx->snap_dir.force_recovery = force_recovery;
3,261✔
1436

1437
        if (xdir_scan(&memtx->snap_dir, true) != 0)
3,261!
1438
                goto fail;
1✔
1439

1440
        /*
1441
         * To check if the instance needs to be rebootstrapped, we
1442
         * need to connect it to remote peers before proceeding to
1443
         * local recovery. In order to do that, we have to start
1444
         * listening for incoming connections, because one of remote
1445
         * peers may be self. This, in turn, requires us to know the
1446
         * instance UUID, as it is a part of a greeting message.
1447
         * So if the local directory isn't empty, read the snapshot
1448
         * signature right now to initialize the instance UUID.
1449
         */
1450
        snap_signature = xdir_last_vclock(&memtx->snap_dir, NULL);
3,260!
1451
        if (snap_signature >= 0) {
3,260✔
1452
                struct xlog_cursor cursor;
1453
                if (xdir_open_cursor(&memtx->snap_dir,
699!
1454
                                     snap_signature, &cursor) != 0)
699!
1455
                        goto fail;
×
1456
                INSTANCE_UUID = cursor.meta.instance_uuid;
699✔
1457
                xlog_cursor_close(&cursor, false);
699!
1458
        }
1459

1460
        /* Apprise the garbage collector of available checkpoints. */
1461
        for (struct vclock *vclock = vclockset_first(&memtx->snap_dir.index);
3,260!
1462
             vclock != NULL;
4,230✔
1463
             vclock = vclockset_next(&memtx->snap_dir.index, vclock)) {
970!
1464
                gc_add_checkpoint(vclock);
970!
1465
        }
1466

1467
        stailq_create(&memtx->gc_queue);
3,260✔
1468
        memtx->gc_fiber = fiber_new_system("memtx.gc", memtx_engine_gc_f);
3,260!
1469
        if (memtx->gc_fiber == NULL)
3,260!
1470
                goto fail;
×
1471

1472
        /*
1473
         * Currently we have two quota consumers: tuple and index allocators.
1474
         * The first one uses either SystemAlloc or memtx->slab_cache (in case
1475
         * of "system" and "small" allocator respectively), the second one uses
1476
         * the memtx->index_slab_cache.
1477
         *
1478
         * In "small" allocator mode the quota smaller than SLAB_SIZE * 2 will
1479
         * be exceeded on the first tuple insertion because both slab_cache and
1480
         * index_slab_cache will request a new SLAB_SIZE-sized slab from the
1481
         * memtx->arena on the DDL operation. SLAB_SIZE * 2 is also enough for
1482
         * bootstrap.snap, but this is a subject to change. All the information
1483
         * is given with assumption SLAB_SIZE > MAX_TUPLE_SIZE + `new tuple
1484
         * allocation overhead`.
1485
         *
1486
         * In "system" allocator mode the required amount of memory for
1487
         * performing the first insert equals to SLAB_SIZE + `the size required
1488
         * to store the tuple including MemtxAllocator<SystemAlloc> overhead`.
1489
         *
1490
         * To avoid unnecessary complexity of the code we have introduced a
1491
         * lower bound of memtx memory we request in any case. The check is not
1492
         * 100% future-proof, but it's not very important so we keep it simple.
1493
         */
1494
        if (tuple_arena_max_size < MIN_MEMORY_QUOTA)
3,260✔
1495
                tuple_arena_max_size = MIN_MEMORY_QUOTA;
7✔
1496

1497
        /* Apply lowest allowed objsize bound. */
1498
        if (objsize_min < OBJSIZE_MIN)
3,260!
1499
                objsize_min = OBJSIZE_MIN;
×
1500

1501
        if (alloc_factor > 2) {
3,260✔
1502
                say_error("Alloc factor must be less than or equal to 2.0. It "
1!
1503
                          "will be reduced to 2.0");
1504
                alloc_factor = 2.0;
1✔
1505
        } else if (alloc_factor <= 1.0) {
3,259!
1506
                say_error("Alloc factor must be greater than 1.0. It will be "
×
1507
                          "increased to 1.001");
1508
                alloc_factor = 1.001;
×
1509
        }
1510

1511
        /* Initialize tuple allocator. */
1512
        quota_init(&memtx->quota, tuple_arena_max_size);
3,260✔
1513
        tuple_arena_create(&memtx->arena, &memtx->quota, tuple_arena_max_size,
3,260!
1514
                           SLAB_SIZE, dontdump, "memtx");
1515
        slab_cache_create(&memtx->slab_cache, &memtx->arena);
3,260!
1516
        float actual_alloc_factor;
1517
        allocator_settings alloc_settings;
1518
        allocator_settings_init(&alloc_settings, &memtx->slab_cache,
3,260✔
1519
                                objsize_min, granularity, alloc_factor,
1520
                                &actual_alloc_factor, &memtx->quota);
1521
        memtx_allocators_init(&alloc_settings);
3,260!
1522
        memtx_set_tuple_format_vtab(allocator);
3,260!
1523

1524
        say_info("Actual slab_alloc_factor calculated on the basis of desired "
3,260!
1525
                 "slab_alloc_factor = %f", actual_alloc_factor);
1526

1527
        /* Initialize index extent allocator. */
1528
        slab_cache_create(&memtx->index_slab_cache, &memtx->arena);
3,260!
1529
        mempool_create(&memtx->index_extent_pool, &memtx->index_slab_cache,
3,260!
1530
                       MEMTX_EXTENT_SIZE);
1531
        mempool_create(&memtx->iterator_pool, cord_slab_cache(),
3,260!
1532
                       MEMTX_ITERATOR_SIZE);
1533
        memtx->num_reserved_extents = 0;
3,260✔
1534
        memtx->reserved_extents = NULL;
3,260✔
1535

1536
        memtx->state = MEMTX_INITIALIZED;
3,260✔
1537
        memtx->max_tuple_size = MAX_TUPLE_SIZE;
3,260✔
1538
        memtx->force_recovery = force_recovery;
3,260✔
1539

1540
        memtx->replica_join_cord = NULL;
3,260✔
1541

1542
        memtx->base.vtab = &memtx_engine_vtab;
3,260✔
1543
        memtx->base.name = "memtx";
3,260✔
1544
        memtx->base.flags = ENGINE_SUPPORTS_READ_VIEW;
3,260✔
1545

1546
        memtx->func_key_format = simple_tuple_format_new(
6,520✔
1547
                &memtx_tuple_format_vtab, &memtx->base, NULL, 0);
3,260!
1548
        if (memtx->func_key_format == NULL) {
3,260!
1549
                diag_log();
×
1550
                panic("failed to create functional index key format");
×
1551
        }
1552
        tuple_format_ref(memtx->func_key_format);
3,260✔
1553

1554
        memtx->on_indexes_built_cb = on_indexes_built;
3,260✔
1555

1556
        fiber_start(memtx->gc_fiber, memtx);
3,260✔
1557
        return memtx;
3,260✔
1558
fail:
1✔
1559
        xdir_destroy(&memtx->snap_dir);
1!
1560
        free(memtx);
1✔
1561
        return NULL;
1✔
1562
}
1563

1564
/** Appends (total, max, avg) stats to info. */
1565
static void
1566
append_total_max_avg_stats(struct info_handler *h, const char *key,
460✔
1567
                           size_t total, size_t max, size_t avg)
1568
{
1569
        info_table_begin(h, key);
460✔
1570
        info_append_int(h, "total", total);
460✔
1571
        info_append_int(h, "max", max);
460✔
1572
        info_append_int(h, "avg", avg);
460✔
1573
        info_table_end(h);
460✔
1574
}
460✔
1575

1576
/** Appends (total, count) stats to info. */
1577
static void
1578
append_total_count_stats(struct info_handler *h, const char *key,
552✔
1579
                         size_t total, size_t count)
1580
{
1581
        info_table_begin(h, key);
552✔
1582
        info_append_int(h, "total", total);
552✔
1583
        info_append_int(h, "count", count);
552✔
1584
        info_table_end(h);
552✔
1585
}
552✔
1586

1587
/** Appends memtx tx stats to info. */
1588
static void
1589
memtx_engine_stat_tx(struct memtx_engine *memtx, struct info_handler *h)
92✔
1590
{
1591
        (void)memtx;
1592
        struct memtx_tx_statistics stats;
1593
        memtx_tx_statistics_collect(&stats);
92!
1594
        info_table_begin(h, "tx");
92!
1595
        info_table_begin(h, "txn");
92!
1596
        for (size_t i = 0; i < TX_ALLOC_TYPE_MAX; ++i) {
368✔
1597
                size_t avg = 0;
276✔
1598
                if (stats.txn_count != 0)
276✔
1599
                        avg = stats.tx_total[i] / stats.txn_count;
84✔
1600
                append_total_max_avg_stats(h, tx_alloc_type_strs[i],
276!
1601
                                           stats.tx_total[i],
1602
                                           stats.tx_max[i], avg);
1603
        }
1604
        info_table_end(h); /* txn */
92!
1605
        info_table_begin(h, "mvcc");
92!
1606
        for (size_t i = 0; i < MEMTX_TX_ALLOC_TYPE_MAX; ++i) {
276✔
1607
                size_t avg = 0;
184✔
1608
                if (stats.txn_count != 0)
184✔
1609
                        avg = stats.memtx_tx_total[i] / stats.txn_count;
56✔
1610
                append_total_max_avg_stats(h, memtx_tx_alloc_type_strs[i],
184!
1611
                                           stats.memtx_tx_total[i],
1612
                                           stats.memtx_tx_max[i], avg);
1613
        }
1614
        info_table_begin(h, "tuples");
92!
1615
        for (size_t i = 0; i < MEMTX_TX_STORY_STATUS_MAX; ++i) {
368✔
1616
                info_table_begin(h, memtx_tx_story_status_strs[i]);
276!
1617
                append_total_count_stats(h, "stories",
276!
1618
                                         stats.stories[i].total,
1619
                                         stats.stories[i].count);
1620
                append_total_count_stats(h, "retained",
276!
1621
                                         stats.retained_tuples[i].total,
1622
                                         stats.retained_tuples[i].count);
1623
                info_table_end(h);
276!
1624
        }
1625
        info_table_end(h); /* tuples */
92!
1626
        info_table_end(h); /* mvcc */
92!
1627
        info_table_end(h); /* tx */
92!
1628
}
92✔
1629

1630
void
1631
memtx_engine_stat(struct memtx_engine *memtx, struct info_handler *h)
92✔
1632
{
1633
        info_begin(h);
92✔
1634
        memtx_engine_stat_tx(memtx, h);
92✔
1635
        info_end(h);
92✔
1636
}
92✔
1637

1638
void
1639
memtx_engine_schedule_gc(struct memtx_engine *memtx,
407,999✔
1640
                         struct memtx_gc_task *task)
1641
{
1642
        stailq_add_tail_entry(&memtx->gc_queue, task, link);
407,999✔
1643
        fiber_wakeup(memtx->gc_fiber);
407,999✔
1644
}
407,999✔
1645

1646
void
1647
memtx_engine_set_snap_io_rate_limit(struct memtx_engine *memtx, double limit)
35✔
1648
{
1649
        memtx->snap_io_rate_limit = limit * 1024 * 1024;
35✔
1650
}
35✔
1651

1652
int
1653
memtx_engine_set_memory(struct memtx_engine *memtx, size_t size)
11✔
1654
{
1655
        if (size < MIN_MEMORY_QUOTA)
11✔
1656
                size = MIN_MEMORY_QUOTA;
4✔
1657

1658
        if (size < quota_total(&memtx->quota)) {
11✔
1659
                diag_set(ClientError, ER_CFG, "memtx_memory",
5!
1660
                         "cannot decrease memory size at runtime");
1661
                return -1;
5✔
1662
        }
1663
        quota_set(&memtx->quota, size);
6✔
1664
        return 0;
6✔
1665
}
1666

1667
void
1668
memtx_engine_set_max_tuple_size(struct memtx_engine *memtx, size_t max_size)
3,268✔
1669
{
1670
        memtx->max_tuple_size = max_size;
3,268✔
1671
}
3,268✔
1672

1673
template<class ALLOC>
1674
static struct tuple *
1675
memtx_tuple_new_raw_impl(struct tuple_format *format, const char *data,
17,337,700✔
1676
                         const char *end, bool validate)
1677
{
1678
        struct memtx_engine *memtx = (struct memtx_engine *)format->engine;
17,337,700✔
1679
        assert(mp_typeof(*data) == MP_ARRAY);
34,675,300!
1680
        struct tuple *tuple = NULL;
17,337,700✔
1681
        struct region *region = &fiber()->gc;
17,337,700!
1682
        size_t region_svp = region_used(region);
17,337,700✔
1683
        struct field_map_builder builder;
1684
        size_t total, tuple_len;
1685
        uint32_t data_offset, field_map_size;
1686
        char *raw;
1687
        bool make_compact;
1688
        if (tuple_field_map_create(format, data, validate, &builder) != 0)
17,337,700!
1689
                goto end;
407✔
1690
        field_map_size = field_map_build_size(&builder);
17,337,300✔
1691
        data_offset = sizeof(struct tuple) + field_map_size;
17,337,300✔
1692
        if (tuple_check_data_offset(data_offset) != 0)
17,337,300!
1693
                goto end;
2✔
1694

1695
        tuple_len = end - data;
17,337,300✔
1696
        assert(tuple_len <= UINT32_MAX); /* bsize is UINT32_MAX */
17,337,300!
1697
        total = sizeof(struct tuple) + field_map_size + tuple_len;
17,337,300✔
1698

1699
        make_compact = tuple_can_be_compact(data_offset, tuple_len);
17,337,300✔
1700
        if (make_compact) {
17,337,300✔
1701
                data_offset -= TUPLE_COMPACT_SAVINGS;
17,210,400✔
1702
                total -= TUPLE_COMPACT_SAVINGS;
17,210,400✔
1703
        }
1704

1705
        ERROR_INJECT(ERRINJ_TUPLE_ALLOC, {
17,337,300!
1706
                diag_set(OutOfMemory, total, "slab allocator", "memtx_tuple");
1707
                goto end;
1708
        });
1709
        if (unlikely(total > memtx->max_tuple_size)) {
17,337,300✔
1710
                diag_set(ClientError, ER_MEMTX_MAX_TUPLE_SIZE, total);
4!
1711
                error_log(diag_last_error(diag_get()));
4!
1712
                goto end;
4✔
1713
        }
1714

1715
        while ((tuple = MemtxAllocator<ALLOC>::alloc_tuple(total)) == NULL) {
17,337,400!
1716
                bool stop;
1717
                memtx_engine_run_gc(memtx, &stop);
195!
1718
                if (stop)
195✔
1719
                        break;
7✔
1720
        }
1721
        if (tuple == NULL) {
17,337,200✔
1722
                diag_set(OutOfMemory, total, "slab allocator", "memtx_tuple");
7!
1723
                goto end;
7✔
1724
        }
1725
        tuple_create(tuple, 0, tuple_format_id(format),
17,337,200✔
1726
                     data_offset, tuple_len, make_compact);
1727
        if (format->is_temporary)
17,337,200✔
1728
                tuple_set_flag(tuple, TUPLE_IS_TEMPORARY);
5,461,260✔
1729
        tuple_format_ref(format);
17,337,200✔
1730
        raw = (char *) tuple + data_offset;
17,337,200✔
1731
        field_map_build(&builder, raw - field_map_size);
17,337,200!
1732
        memcpy(raw, data, tuple_len);
17,337,200✔
1733
end:
17,337,700✔
1734
        region_truncate(region, region_svp);
17,337,700!
1735
        return tuple;
17,337,700✔
1736
}
1737

1738
template<class ALLOC>
1739
static inline struct tuple *
1740
memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
17,337,700✔
1741
{
1742
        return memtx_tuple_new_raw_impl<ALLOC>(format, data, end, true);
17,337,700✔
1743
}
1744

1745
template<class ALLOC>
1746
static void
1747
memtx_tuple_delete(struct tuple_format *format, struct tuple *tuple)
11,743,000✔
1748
{
1749
        assert(tuple_is_unreferenced(tuple));
11,743,000!
1750
        MemtxAllocator<ALLOC>::free_tuple(tuple);
11,743,000✔
1751
        tuple_format_unref(format);
11,743,000✔
1752
}
11,743,000✔
1753

1754
struct tuple_format_vtab memtx_tuple_format_vtab;
1755

1756
template <class ALLOC>
1757
static inline void
1758
create_memtx_tuple_format_vtab(struct tuple_format_vtab *vtab)
3,260✔
1759
{
1760
        vtab->tuple_delete = memtx_tuple_delete<ALLOC>;
3,260✔
1761
        vtab->tuple_new = memtx_tuple_new<ALLOC>;
3,260✔
1762
}
3,260✔
1763

1764
/**
1765
 * Allocate a block of size MEMTX_EXTENT_SIZE for memtx index
1766
 */
1767
void *
1768
memtx_index_extent_alloc(void *ctx)
983,953✔
1769
{
1770
        struct memtx_engine *memtx = (struct memtx_engine *)ctx;
983,953✔
1771
        if (memtx->reserved_extents) {
983,953✔
1772
                assert(memtx->num_reserved_extents > 0);
983,321!
1773
                memtx->num_reserved_extents--;
983,321✔
1774
                void *result = memtx->reserved_extents;
983,321✔
1775
                memtx->reserved_extents = *(void **)memtx->reserved_extents;
983,321✔
1776
                return result;
983,321✔
1777
        }
1778
        ERROR_INJECT(ERRINJ_INDEX_ALLOC, {
632!
1779
                /* same error as in mempool_alloc */
1780
                diag_set(OutOfMemory, MEMTX_EXTENT_SIZE,
1781
                         "mempool", "new slab");
1782
                return NULL;
1783
        });
1784
        void *ret;
1785
        while ((ret = mempool_alloc(&memtx->index_extent_pool)) == NULL) {
632!
1786
                bool stop;
1787
                memtx_engine_run_gc(memtx, &stop);
×
1788
                if (stop)
×
1789
                        break;
×
1790
        }
1791
        if (ret == NULL)
632!
1792
                diag_set(OutOfMemory, MEMTX_EXTENT_SIZE,
×
1793
                         "mempool", "new slab");
1794
        return ret;
632✔
1795
}
1796

1797
/**
1798
 * Free a block previously allocated by memtx_index_extent_alloc
1799
 */
1800
void
1801
memtx_index_extent_free(void *ctx, void *extent)
666,140✔
1802
{
1803
        struct memtx_engine *memtx = (struct memtx_engine *)ctx;
666,140✔
1804
        return mempool_free(&memtx->index_extent_pool, extent);
666,140✔
1805
}
1806

1807
/**
1808
 * Reserve num extents in pool.
1809
 * Ensure that next num extent_alloc will succeed w/o an error
1810
 */
1811
int
1812
memtx_index_extent_reserve(struct memtx_engine *memtx, int num)
19,205,000✔
1813
{
1814
        ERROR_INJECT(ERRINJ_INDEX_ALLOC, {
19,205,000!
1815
                /* same error as in mempool_alloc */
1816
                diag_set(OutOfMemory, MEMTX_EXTENT_SIZE,
1817
                         "mempool", "new slab");
1818
                return -1;
1819
        });
1820
        struct mempool *pool = &memtx->index_extent_pool;
19,205,000✔
1821
        while (memtx->num_reserved_extents < num) {
20,237,500✔
1822
                void *ext;
1823
                while ((ext = mempool_alloc(pool)) == NULL) {
1,225,940✔
1824
                        bool stop;
1825
                        memtx_engine_run_gc(memtx, &stop);
193,422!
1826
                        if (stop)
193,422!
1827
                                break;
×
1828
                }
1829
                if (ext == NULL) {
1,032,510!
1830
                        diag_set(OutOfMemory, MEMTX_EXTENT_SIZE,
×
1831
                                 "mempool", "new slab");
1832
                        return -1;
×
1833
                }
1834
                *(void **)ext = memtx->reserved_extents;
1,032,510✔
1835
                memtx->reserved_extents = ext;
1,032,510✔
1836
                memtx->num_reserved_extents++;
1,032,510✔
1837
        }
1838
        return 0;
19,205,000✔
1839
}
1840

1841
bool
1842
memtx_index_def_change_requires_rebuild(struct index *index,
54,055✔
1843
                                        const struct index_def *new_def)
1844
{
1845
        struct index_def *old_def = index->def;
54,055✔
1846

1847
        assert(old_def->iid == new_def->iid);
54,055!
1848
        assert(old_def->space_id == new_def->space_id);
54,055!
1849

1850
        if (old_def->type != new_def->type)
54,055✔
1851
                return true;
3,240✔
1852
        if (!old_def->opts.is_unique && new_def->opts.is_unique)
50,815✔
1853
                return true;
8✔
1854
        if (old_def->opts.func_id != new_def->opts.func_id)
50,807✔
1855
                return true;
1✔
1856
        if (old_def->opts.hint != new_def->opts.hint)
50,806✔
1857
                return true;
48,547✔
1858

1859
        const struct key_def *old_cmp_def, *new_cmp_def;
1860
        if (index_depends_on_pk(index)) {
2,259✔
1861
                old_cmp_def = old_def->cmp_def;
40✔
1862
                new_cmp_def = new_def->cmp_def;
40✔
1863
        } else {
1864
                old_cmp_def = old_def->key_def;
2,219✔
1865
                new_cmp_def = new_def->key_def;
2,219✔
1866
        }
1867

1868
        /*
1869
         * Compatibility of field types is verified by CheckSpaceFormat
1870
         * so it suffices to check that the new key definition indexes
1871
         * the same set of fields in the same order.
1872
         */
1873
        if (old_cmp_def->part_count != new_cmp_def->part_count)
2,259✔
1874
                return true;
8✔
1875

1876
        for (uint32_t i = 0; i < new_cmp_def->part_count; i++) {
2,533✔
1877
                const struct key_part *old_part = &old_cmp_def->parts[i];
2,331✔
1878
                const struct key_part *new_part = &new_cmp_def->parts[i];
2,331✔
1879
                if (old_part->fieldno != new_part->fieldno)
2,331✔
1880
                        return true;
2,017✔
1881
                if (old_part->coll != new_part->coll)
314✔
1882
                        return true;
23✔
1883
                if (json_path_cmp(old_part->path, old_part->path_len,
582✔
1884
                                  new_part->path, new_part->path_len,
291✔
1885
                                  TUPLE_INDEX_BASE) != 0)
291✔
1886
                        return true;
7✔
1887
                if (old_part->exclude_null != new_part->exclude_null)
284✔
1888
                        return true;
2✔
1889
        }
1890
        assert(old_cmp_def->is_multikey == new_cmp_def->is_multikey);
202!
1891
        return false;
202✔
1892
}
1893

1894
int
1895
memtx_prepare_result_tuple(struct tuple **result)
38,008,200✔
1896
{
1897
        if (*result != NULL) {
38,008,200✔
1898
                *result = memtx_tuple_decompress(*result);
24,480,300✔
1899
                if (*result == NULL)
24,480,300!
1900
                        return -1;
×
1901
                tuple_bless(*result);
24,480,300✔
1902
        }
1903
        return 0;
38,008,200✔
1904
}
1905

1906
int
1907
memtx_prepare_read_view_tuple(struct tuple *tuple,
2,077,670✔
1908
                              struct index_read_view *index,
1909
                              struct memtx_tx_snapshot_cleaner *cleaner,
1910
                              struct read_view_tuple *result)
1911
{
1912
        tuple = memtx_tx_snapshot_clarify(cleaner, tuple);
2,077,670✔
1913
        if (tuple == NULL) {
2,077,670✔
1914
                *result = read_view_tuple_none();
5✔
1915
                return 0;
5✔
1916
        }
1917
        result->needs_upgrade = index->space->upgrade == NULL ? false :
2,077,670!
1918
                                memtx_read_view_tuple_needs_upgrade(
×
1919
                                        index->space->upgrade, tuple);
×
1920
        result->data = tuple_data_range(tuple, &result->size);
2,077,670✔
1921
        if (!index->space->rv->disable_decompression) {
2,077,670!
1922
                result->data = memtx_tuple_decompress_raw(
4,155,330✔
1923
                                result->data, result->data + result->size,
2,077,670✔
1924
                                &result->size);
1925
                if (result->data == NULL)
2,077,670!
1926
                        return -1;
×
1927
        }
1928
        return 0;
2,077,670✔
1929
}
1930

1931
int
1932
memtx_index_get(struct index *index, const char *key, uint32_t part_count,
1,624,410✔
1933
                struct tuple **result)
1934
{
1935
        if (index->vtab->get_internal(index, key, part_count, result) != 0)
1,624,410!
1936
                return -1;
×
1937
        return memtx_prepare_result_tuple(result);
1,624,410✔
1938
}
1939

1940
int
1941
memtx_iterator_next(struct iterator *it, struct tuple **ret)
36,382,900✔
1942
{
1943
        if (it->next_internal(it, ret) != 0)
36,382,900!
1944
                return -1;
×
1945
        return memtx_prepare_result_tuple(ret);
36,382,900✔
1946
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc