• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 12478847095

24 Dec 2024 07:58AM UTC coverage: 85.978% (-0.009%) from 85.987%
12478847095

push

github

Buristan
test: enable back tests for LuaJIT

Since the commit db351d3bc ("luajit: bump
new version"), which introduces CTest as a launcher for LuaJIT's test
suites, the LuaJIT tests are not run by Tarantool since
`LUAJIT_USE_TEST` is disabled. This patch enables all tests, except ASAN
build, due to #10733.

NO_DOC=testing
NO_CHANGELOG=testing
NO_TEST=enable tests back

(cherry picked from commit 1f09ba66e)

63259 of 114505 branches covered (55.25%)

94117 of 109466 relevant lines covered (85.98%)

2806419.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.96
/src/box/vinyl.c
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "vinyl.h"
32

33
#include "vy_mem.h"
34
#include "vy_run.h"
35
#include "vy_range.h"
36
#include "vy_lsm.h"
37
#include "vy_tx.h"
38
#include "vy_cache.h"
39
#include "vy_log.h"
40
#include "vy_upsert.h"
41
#include "vy_write_iterator.h"
42
#include "vy_read_iterator.h"
43
#include "vy_point_lookup.h"
44
#include "vy_quota.h"
45
#include "vy_scheduler.h"
46
#include "vy_regulator.h"
47
#include "vy_stat.h"
48

49
#include <stdbool.h>
50
#include <stddef.h>
51
#include <stdint.h>
52
#include <stdlib.h>
53

54
#include <small/lsregion.h>
55
#include <small/region.h>
56
#include <small/mempool.h>
57

58
#include "coio_task.h"
59
#include "cbus.h"
60
#include "histogram.h"
61
#include "xrow_update.h"
62
#include "txn.h"
63
#include "xrow.h"
64
#include "xlog.h"
65
#include "engine.h"
66
#include "space.h"
67
#include "index.h"
68
#include "schema.h"
69
#include "xstream.h"
70
#include "info/info.h"
71
#include "column_mask.h"
72
#include "trigger.h"
73
#include "wal.h" /* wal_mode() */
74

75
/**
76
 * Yield after iterating over this many objects (e.g. ranges).
77
 * Yield more often in debug mode.
78
 */
79
#if defined(NDEBUG)
80
enum { VY_YIELD_LOOPS = 128 };
81
#else
82
enum { VY_YIELD_LOOPS = 2 };
83
#endif
84

85
struct vy_squash_queue;
86

87
enum vy_status {
88
        VINYL_OFFLINE,
89
        VINYL_INITIAL_RECOVERY_LOCAL,
90
        VINYL_INITIAL_RECOVERY_REMOTE,
91
        VINYL_FINAL_RECOVERY_LOCAL,
92
        VINYL_FINAL_RECOVERY_REMOTE,
93
        VINYL_HOT_STANDBY,
94
        VINYL_ONLINE,
95
};
96

97
struct vy_env {
98
        struct engine base;
99
        /** Recovery status */
100
        enum vy_status status;
101
        /** TX manager */
102
        struct vy_tx_manager *xm;
103
        /** Upsert squash queue */
104
        struct vy_squash_queue *squash_queue;
105
        /** Memory pool for index iterator. */
106
        struct mempool iterator_pool;
107
        /** Memory quota */
108
        struct vy_quota     quota;
109
        /** Statement environment. */
110
        struct vy_stmt_env stmt_env;
111
        /** Common LSM tree environment. */
112
        struct vy_lsm_env lsm_env;
113
        /** Environment for cache subsystem */
114
        struct vy_cache_env cache_env;
115
        /** Environment for run subsystem */
116
        struct vy_run_env run_env;
117
        /** Environment for memory subsystem. */
118
        struct vy_mem_env mem_env;
119
        /** Scheduler */
120
        struct vy_scheduler scheduler;
121
        /** Load regulator. */
122
        struct vy_regulator regulator;
123
        /** Local recovery context. */
124
        struct vy_recovery *recovery;
125
        /** Local recovery vclock. */
126
        const struct vclock *recovery_vclock;
127
        /** Path to the data directory. */
128
        char *path;
129
        /** Max time a transaction may wait for memory. */
130
        double timeout;
131
        /** Try to recover corrupted data if set. */
132
        bool force_recovery;
133
};
134

135
/** Mask passed to vy_gc(). */
136
enum {
137
        /** Delete incomplete runs. */
138
        VY_GC_INCOMPLETE        = 1 << 0,
139
        /** Delete dropped runs. */
140
        VY_GC_DROPPED                = 1 << 1,
141
};
142

143
static void
144
vy_gc(struct vy_env *env, struct vy_recovery *recovery,
145
      unsigned int gc_mask, int64_t gc_lsn);
146

147
struct vinyl_iterator {
148
        struct iterator base;
149
        /** Memory pool the iterator was allocated from. */
150
        struct mempool *pool;
151
        /**
152
         * Iterator position for pagination. Set to none when
153
         * the iterator is created. Points to the last non-none
154
         * entry returned by the iterator.
155
         */
156
        struct vy_entry pos;
157
        /**
158
         * Points either to tx_autocommit for autocommit mode
159
         * or to a multi-statement transaction active when the
160
         * iterator was created.
161
         */
162
        struct vy_tx *tx;
163
        /** Search key. */
164
        struct vy_entry key;
165
        /** Vinyl read iterator. */
166
        struct vy_read_iterator iterator;
167
        /**
168
         * Built-in transaction created when iterator is opened
169
         * in autocommit mode.
170
         */
171
        struct vy_tx tx_autocommit;
172
        /** Trigger invoked when tx ends to close the iterator. */
173
        struct trigger on_tx_destroy;
174
};
175

176
static const struct engine_vtab vinyl_engine_vtab;
177
static const struct space_vtab vinyl_space_vtab;
178
static const struct index_vtab vinyl_index_vtab;
179

180
static struct trigger on_replace_vinyl_deferred_delete;
181

182
/** Extract vy_env from an engine object. */
183
static inline struct vy_env *
184
vy_env(struct engine *engine)
4,124,070✔
185
{
186
        assert(engine->vtab == &vinyl_engine_vtab);
4,124,070!
187
        return (struct vy_env *)engine;
4,124,070✔
188
}
189

190
/** Extract vy_lsm from an index object. */
191
struct vy_lsm *
192
vy_lsm(struct index *index)
4,376,720✔
193
{
194
        assert(index->vtab == &vinyl_index_vtab);
4,376,720!
195
        return (struct vy_lsm *)index;
4,376,720✔
196
}
197

198
/**
199
 * A quick intro into Vinyl cosmology and file format
200
 * --------------------------------------------------
201
 * A single vinyl index on disk consists of a set of "range"
202
 * objects. A range contains a sorted set of index keys;
203
 * keys in different ranges do not overlap and all ranges of the
204
 * same index together span the whole key space, for example:
205
 * (-inf..100), [100..114), [114..304), [304..inf)
206
 *
207
 * A sorted set of keys in a range is called a run. A single
208
 * range may contain multiple runs, each run contains changes of
209
 * keys in the range over a certain period of time. The periods do
210
 * not overlap, while, of course, two runs of the same range may
211
 * contain changes of the same key.
212
 * All keys in a run are sorted and split between pages of
213
 * approximately equal size. The purpose of putting keys into
214
 * pages is a quicker key lookup, since (min,max) key of every
215
 * page is put into the page index, stored at the beginning of each
216
 * run. The page index of an active run is fully cached in RAM.
217
 *
218
 * All files of an index have the following name pattern:
219
 * <run_id>.{run,index}
220
 * and are stored together in the index directory.
221
 *
222
 * Files that end with '.index' store page index (see vy_run_info)
223
 * while '.run' files store vinyl statements.
224
 *
225
 * <run_id> is the unique id of this run. Newer runs have greater ids.
226
 *
227
 * Information about which run id belongs to which range is stored
228
 * in vinyl.meta file.
229
 */
230

231
/** {{{ Introspection */
232

233
static void
234
vy_info_append_scheduler(struct vy_env *env, struct info_handler *h)
43,364✔
235
{
236
        struct vy_scheduler_stat *stat = &env->scheduler.stat;
43,364✔
237

238
        info_table_begin(h, "scheduler");
43,364✔
239
        info_append_int(h, "tasks_inprogress", stat->tasks_inprogress);
43,364✔
240
        info_append_int(h, "tasks_completed", stat->tasks_completed);
43,364✔
241
        info_append_int(h, "tasks_failed", stat->tasks_failed);
43,364✔
242
        info_append_int(h, "dump_count", stat->dump_count);
43,364✔
243
        info_append_double(h, "dump_time", stat->dump_time);
43,364✔
244
        info_append_int(h, "dump_input", stat->dump_input);
43,364✔
245
        info_append_int(h, "dump_output", stat->dump_output);
43,364✔
246
        info_append_double(h, "compaction_time", stat->compaction_time);
43,364✔
247
        info_append_int(h, "compaction_input", stat->compaction_input);
43,364✔
248
        info_append_int(h, "compaction_output", stat->compaction_output);
43,364✔
249
        info_append_int(h, "compaction_queue",
43,364✔
250
                        env->lsm_env.compaction_queue_size);
251
        info_table_end(h); /* scheduler */
43,364✔
252
}
43,364✔
253

254
static void
255
vy_info_append_regulator(struct vy_env *env, struct info_handler *h)
43,364✔
256
{
257
        struct vy_regulator *r = &env->regulator;
43,364✔
258

259
        info_table_begin(h, "regulator");
43,364✔
260
        info_append_int(h, "write_rate", r->write_rate);
43,364✔
261
        info_append_int(h, "dump_bandwidth", r->dump_bandwidth);
43,364✔
262
        info_append_int(h, "dump_watermark", r->dump_watermark);
43,364✔
263
        info_append_int(h, "rate_limit", vy_quota_get_rate_limit(r->quota,
43,364✔
264
                                                        VY_QUOTA_CONSUMER_TX));
265
        info_append_int(h, "blocked_writers", r->quota->n_blocked);
43,364✔
266
        info_table_end(h); /* regulator */
43,364✔
267
}
43,364✔
268

269
static void
270
vy_info_append_tx(struct vy_env *env, struct info_handler *h)
43,364✔
271
{
272
        struct vy_tx_manager *xm = env->xm;
43,364✔
273

274
        info_table_begin(h, "tx");
43,364!
275

276
        info_append_int(h, "commit", xm->stat.commit);
43,364!
277
        info_append_int(h, "rollback", xm->stat.rollback);
43,364!
278
        info_append_int(h, "conflict", xm->stat.conflict);
43,364!
279

280
        struct mempool_stats mstats;
281
        mempool_stats(&xm->tx_mempool, &mstats);
43,364!
282
        info_append_int(h, "transactions", mstats.objcount);
43,364!
283
        mempool_stats(&xm->txv_mempool, &mstats);
43,364!
284
        info_append_int(h, "statements", mstats.objcount);
43,364!
285
        mempool_stats(&xm->read_interval_mempool, &mstats);
43,364!
286
        info_append_int(h, "gap_locks", mstats.objcount);
43,364!
287
        mempool_stats(&xm->read_view_mempool, &mstats);
43,364!
288
        info_append_int(h, "read_views", mstats.objcount);
43,364!
289

290
        info_table_end(h); /* tx */
43,364!
291
}
43,364✔
292

293
static void
294
vy_info_append_memory(struct vy_env *env, struct info_handler *h)
43,364✔
295
{
296
        info_table_begin(h, "memory");
43,364✔
297
        info_append_int(h, "tx", vy_tx_manager_mem_used(env->xm));
43,364✔
298
        info_append_int(h, "level0", lsregion_used(&env->mem_env.allocator));
43,364✔
299
        info_append_int(h, "tuple_cache", env->cache_env.mem_used);
43,364✔
300
        info_append_int(h, "page_index", env->lsm_env.page_index_size);
43,364✔
301
        info_append_int(h, "bloom_filter", env->lsm_env.bloom_size);
43,364✔
302
        info_table_end(h); /* memory */
43,364✔
303
}
43,364✔
304

305
static void
306
vy_info_append_disk(struct vy_env *env, struct info_handler *h)
43,364✔
307
{
308
        info_table_begin(h, "disk");
43,364✔
309
        info_append_int(h, "data", env->lsm_env.disk_data_size);
43,364✔
310
        info_append_int(h, "index", env->lsm_env.disk_index_size);
43,364✔
311
        info_append_int(h, "data_compacted", env->lsm_env.compacted_data_size);
43,364✔
312
        info_table_end(h); /* disk */
43,364✔
313
}
43,364✔
314

315
void
316
vinyl_engine_stat(struct engine *engine, struct info_handler *h)
43,364✔
317
{
318
        struct vy_env *env = vy_env(engine);
43,364✔
319

320
        info_begin(h);
43,364✔
321
        vy_info_append_tx(env, h);
43,364✔
322
        vy_info_append_memory(env, h);
43,364✔
323
        vy_info_append_disk(env, h);
43,364✔
324
        vy_info_append_scheduler(env, h);
43,364✔
325
        vy_info_append_regulator(env, h);
43,364✔
326
        info_end(h);
43,364✔
327
}
43,364✔
328

329
static void
330
vy_info_append_stmt_counter(struct info_handler *h, const char *name,
22,170✔
331
                            const struct vy_stmt_counter *count)
332
{
333
        if (name != NULL)
22,170✔
334
                info_table_begin(h, name);
16,258✔
335
        info_append_int(h, "rows", count->rows);
22,170✔
336
        info_append_int(h, "bytes", count->bytes);
22,170✔
337
        if (name != NULL)
22,170✔
338
                info_table_end(h);
16,258✔
339
}
22,170✔
340

341
static void
342
vy_info_append_disk_stmt_counter(struct info_handler *h, const char *name,
10,346✔
343
                                 const struct vy_disk_stmt_counter *count)
344
{
345
        if (name != NULL)
10,346✔
346
                info_table_begin(h, name);
8,868✔
347
        info_append_int(h, "rows", count->rows);
10,346✔
348
        info_append_int(h, "bytes", count->bytes);
10,346✔
349
        info_append_int(h, "bytes_compressed", count->bytes_compressed);
10,346✔
350
        info_append_int(h, "pages", count->pages);
10,346✔
351
        if (name != NULL)
10,346✔
352
                info_table_end(h);
8,868✔
353
}
10,346✔
354

355
static void
356
vinyl_index_stat(struct index *index, struct info_handler *h)
1,478✔
357
{
358
        char buf[1024];
359
        struct vy_lsm *lsm = vy_lsm(index);
1,478!
360
        struct vy_lsm_stat *stat = &lsm->stat;
1,478✔
361
        struct vy_cache_stat *cache_stat = &lsm->cache.stat;
1,478✔
362

363
        info_begin(h);
1,478!
364

365
        struct vy_stmt_counter count = stat->memory.count;
1,478✔
366
        vy_stmt_counter_add_disk(&count, &stat->disk.count);
1,478!
367
        vy_info_append_stmt_counter(h, NULL, &count);
1,478!
368

369
        info_append_int(h, "lookup", stat->lookup);
1,478!
370
        vy_info_append_stmt_counter(h, "get", &stat->get);
1,478!
371
        vy_info_append_stmt_counter(h, "skip", &stat->skip);
1,478!
372
        vy_info_append_stmt_counter(h, "put", &stat->put);
1,478!
373

374
        info_table_begin(h, "latency");
1,478!
375
        info_append_double(h, "p50", latency_get(&stat->latency, 50));
1,478!
376
        info_append_double(h, "p75", latency_get(&stat->latency, 75));
1,478!
377
        info_append_double(h, "p90", latency_get(&stat->latency, 90));
1,478!
378
        info_append_double(h, "p95", latency_get(&stat->latency, 95));
1,478!
379
        info_append_double(h, "p99", latency_get(&stat->latency, 99));
1,478!
380
        info_table_end(h); /* latency */
1,478!
381

382
        info_table_begin(h, "upsert");
1,478!
383
        info_append_int(h, "squashed", stat->upsert.squashed);
1,478!
384
        info_append_int(h, "applied", stat->upsert.applied);
1,478!
385
        info_table_end(h); /* upsert */
1,478!
386

387
        info_table_begin(h, "memory");
1,478!
388
        vy_info_append_stmt_counter(h, NULL, &stat->memory.count);
1,478!
389
        info_table_begin(h, "iterator");
1,478!
390
        info_append_int(h, "lookup", stat->memory.iterator.lookup);
1,478!
391
        vy_info_append_stmt_counter(h, "get", &stat->memory.iterator.get);
1,478!
392
        info_table_end(h); /* iterator */
1,478!
393
        info_append_int(h, "index_size", vy_lsm_mem_tree_size(lsm));
1,478!
394
        info_table_end(h); /* memory */
1,478!
395

396
        info_table_begin(h, "disk");
1,478!
397
        vy_info_append_disk_stmt_counter(h, NULL, &stat->disk.count);
1,478!
398
        vy_info_append_disk_stmt_counter(h, "last_level",
1,478✔
399
                                         &stat->disk.last_level_count);
1,478!
400
        info_table_begin(h, "statement");
1,478!
401
        info_append_int(h, "inserts", stat->disk.stmt.inserts);
1,478!
402
        info_append_int(h, "replaces", stat->disk.stmt.replaces);
1,478!
403
        info_append_int(h, "deletes", stat->disk.stmt.deletes);
1,478!
404
        info_append_int(h, "upserts", stat->disk.stmt.upserts);
1,478!
405
        info_table_end(h); /* statement */
1,478!
406
        info_table_begin(h, "iterator");
1,478!
407
        info_append_int(h, "lookup", stat->disk.iterator.lookup);
1,478!
408
        vy_info_append_stmt_counter(h, "get", &stat->disk.iterator.get);
1,478!
409
        vy_info_append_disk_stmt_counter(h, "read", &stat->disk.iterator.read);
1,478!
410
        info_table_begin(h, "bloom");
1,478!
411
        info_append_int(h, "hit", stat->disk.iterator.bloom_hit);
1,478!
412
        info_append_int(h, "miss", stat->disk.iterator.bloom_miss);
1,478!
413
        info_table_end(h); /* bloom */
1,478!
414
        info_table_end(h); /* iterator */
1,478!
415
        info_table_begin(h, "dump");
1,478!
416
        info_append_int(h, "count", stat->disk.dump.count);
1,478!
417
        info_append_double(h, "time", stat->disk.dump.time);
1,478!
418
        vy_info_append_stmt_counter(h, "input", &stat->disk.dump.input);
1,478!
419
        vy_info_append_disk_stmt_counter(h, "output", &stat->disk.dump.output);
1,478!
420
        info_table_end(h); /* dump */
1,478!
421
        info_table_begin(h, "compaction");
1,478!
422
        info_append_int(h, "count", stat->disk.compaction.count);
1,478!
423
        info_append_double(h, "time", stat->disk.compaction.time);
1,478!
424
        vy_info_append_disk_stmt_counter(h, "input", &stat->disk.compaction.input);
1,478!
425
        vy_info_append_disk_stmt_counter(h, "output", &stat->disk.compaction.output);
1,478!
426
        vy_info_append_disk_stmt_counter(h, "queue", &stat->disk.compaction.queue);
1,478!
427
        info_table_end(h); /* compaction */
1,478!
428
        info_append_int(h, "index_size", lsm->page_index_size);
1,478!
429
        info_append_int(h, "bloom_size", lsm->bloom_size);
1,478!
430
        info_table_end(h); /* disk */
1,478!
431

432
        info_table_begin(h, "cache");
1,478!
433
        vy_info_append_stmt_counter(h, NULL, &cache_stat->count);
1,478!
434
        info_append_int(h, "lookup", cache_stat->lookup);
1,478!
435
        vy_info_append_stmt_counter(h, "get", &cache_stat->get);
1,478!
436
        vy_info_append_stmt_counter(h, "put", &cache_stat->put);
1,478!
437
        vy_info_append_stmt_counter(h, "invalidate", &cache_stat->invalidate);
1,478!
438
        vy_info_append_stmt_counter(h, "evict", &cache_stat->evict);
1,478!
439
        info_append_int(h, "index_size",
1,478✔
440
                        vy_cache_tree_mem_used(&lsm->cache.cache_tree));
1,478!
441
        info_table_end(h); /* cache */
1,478!
442

443
        info_table_begin(h, "txw");
1,478!
444
        vy_info_append_stmt_counter(h, NULL, &stat->txw.count);
1,478!
445
        info_table_begin(h, "iterator");
1,478!
446
        info_append_int(h, "lookup", stat->txw.iterator.lookup);
1,478!
447
        vy_info_append_stmt_counter(h, "get", &stat->txw.iterator.get);
1,478!
448
        info_table_end(h); /* iterator */
1,478!
449
        info_table_end(h); /* txw */
1,478!
450

451
        info_append_int(h, "range_size", vy_lsm_range_size(lsm));
1,478!
452
        info_append_int(h, "range_count", lsm->range_count);
1,478!
453
        info_append_int(h, "run_count", lsm->run_count);
1,478!
454
        info_append_int(h, "run_avg", lsm->run_count / lsm->range_count);
1,478!
455
        histogram_snprint(buf, sizeof(buf), lsm->run_hist);
1,478!
456
        info_append_str(h, "run_histogram", buf);
1,478!
457
        info_append_int(h, "dumps_per_compaction",
1,478✔
458
                        vy_lsm_dumps_per_compaction(lsm));
1,478!
459

460
        info_end(h);
1,478!
461
}
1,478✔
462

463
static void
464
vinyl_index_reset_stat(struct index *index)
86✔
465
{
466
        struct vy_lsm *lsm = vy_lsm(index);
86✔
467
        struct vy_lsm_stat *stat = &lsm->stat;
86✔
468
        struct vy_cache_stat *cache_stat = &lsm->cache.stat;
86✔
469

470
        /* Get/put */
471
        stat->lookup = 0;
86✔
472
        latency_reset(&stat->latency);
86✔
473
        vy_stmt_counter_reset(&stat->get);
86✔
474
        vy_stmt_counter_reset(&stat->skip);
86✔
475
        vy_stmt_counter_reset(&stat->put);
86✔
476
        memset(&stat->upsert, 0, sizeof(stat->upsert));
86✔
477

478
        /* Iterator */
479
        memset(&stat->txw.iterator, 0, sizeof(stat->txw.iterator));
86✔
480
        memset(&stat->memory.iterator, 0, sizeof(stat->memory.iterator));
86✔
481
        memset(&stat->disk.iterator, 0, sizeof(stat->disk.iterator));
86✔
482

483
        /* Dump */
484
        stat->disk.dump.count = 0;
86✔
485
        stat->disk.dump.time = 0;
86✔
486
        vy_stmt_counter_reset(&stat->disk.dump.input);
86✔
487
        vy_disk_stmt_counter_reset(&stat->disk.dump.output);
86✔
488

489
        /* Compaction */
490
        stat->disk.compaction.count = 0;
86✔
491
        stat->disk.compaction.time = 0;
86✔
492
        vy_disk_stmt_counter_reset(&stat->disk.compaction.input);
86✔
493
        vy_disk_stmt_counter_reset(&stat->disk.compaction.output);
86✔
494

495
        /* Cache */
496
        cache_stat->lookup = 0;
86✔
497
        vy_stmt_counter_reset(&cache_stat->get);
86✔
498
        vy_stmt_counter_reset(&cache_stat->put);
86✔
499
        vy_stmt_counter_reset(&cache_stat->invalidate);
86✔
500
        vy_stmt_counter_reset(&cache_stat->evict);
86✔
501
}
86✔
502

503
static void
504
vinyl_engine_memory_stat(struct engine *engine, struct engine_memory_stat *stat)
87✔
505
{
506
        struct vy_env *env = vy_env(engine);
87✔
507

508
        stat->data += lsregion_used(&env->mem_env.allocator) -
87✔
509
                                env->mem_env.tree_extent_size;
87✔
510
        stat->index += env->mem_env.tree_extent_size;
87✔
511
        stat->index += env->lsm_env.bloom_size;
87✔
512
        stat->index += env->lsm_env.page_index_size;
87✔
513
        stat->cache += env->cache_env.mem_used;
87✔
514
        stat->tx += vy_tx_manager_mem_used(env->xm);
87✔
515
}
87✔
516

517
static void
518
vinyl_engine_reset_stat(struct engine *engine)
32✔
519
{
520
        struct vy_env *env = vy_env(engine);
32✔
521

522
        struct vy_tx_manager *xm = env->xm;
32✔
523
        memset(&xm->stat, 0, sizeof(xm->stat));
32✔
524

525
        vy_scheduler_reset_stat(&env->scheduler);
32✔
526
        vy_regulator_reset_stat(&env->regulator);
32✔
527
}
32✔
528

529
/** }}} Introspection */
530

531
/**
532
 * Check if WAL is enabled.
533
 *
534
 * Vinyl needs to log all operations done on indexes in its own
535
 * journal - vylog. If we allowed to use it in conjunction with
536
 * wal_mode = 'none', vylog and WAL could get out of sync, which
537
 * can result in weird recovery errors. So we forbid DML/DDL
538
 * operations in case WAL is disabled.
539
 */
540
static inline int
541
vinyl_check_wal(struct vy_env *env, const char *what)
665,892✔
542
{
543
        if (env->status == VINYL_ONLINE && wal_mode() == WAL_NONE) {
665,892✔
544
                diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
5!
545
                         tt_sprintf("%s if wal_mode = 'none'", what));
546
                return -1;
5✔
547
        }
548
        return 0;
665,887✔
549
}
550

551
/**
552
 * Given a space and an index id, return vy_lsm.
553
 * If index not found, return NULL and set diag.
554
 */
555
static struct vy_lsm *
556
vy_lsm_find(struct space *space, uint32_t iid)
1,616,920✔
557
{
558
        struct index *index = index_find(space, iid);
1,616,920✔
559
        if (index == NULL)
1,616,920✔
560
                return NULL;
4✔
561
        return vy_lsm(index);
1,616,910✔
562
}
563

564
/**
565
 * Wrapper around vy_lsm_find() which ensures that
566
 * the found index is unique.
567
 */
568
static  struct vy_lsm *
569
vy_lsm_find_unique(struct space *space, uint32_t index_id)
215,759✔
570
{
571
        struct vy_lsm *lsm = vy_lsm_find(space, index_id);
215,759✔
572
        if (lsm != NULL && !lsm->opts.is_unique) {
215,759!
573
                diag_set(ClientError, ER_MORE_THAN_ONE_TUPLE);
2!
574
                return NULL;
2✔
575
        }
576
        return lsm;
215,757✔
577
}
578

579
static int
580
vinyl_engine_check_space_def(struct space_def *def)
5,942✔
581
{
582
        for (uint32_t i = 0; i < def->field_count; i++) {
30,812✔
583
                if (def->fields[i].compression_type != COMPRESSION_TYPE_NONE) {
24,870!
584
                        diag_set(ClientError, ER_UNSUPPORTED,
×
585
                                 "Vinyl", "compression");
586
                        return -1;
×
587
                }
588
        }
589
        if (def->opts.is_temporary) {
5,942✔
590
                diag_set(ClientError, ER_ALTER_SPACE,
1!
591
                         def->name, "engine does not support temporary flag");
592
                return -1;
1✔
593
        }
594
        return 0;
5,941✔
595
}
596

597
/** Create a vinyl space statement format. */
598
static struct tuple_format *
599
vy_space_stmt_format_new(struct vy_stmt_env *env, struct key_def *const *keys,
19,702✔
600
                         uint16_t key_count, struct space_def *space_def)
601
{
602
        return space_tuple_format_new(&env->tuple_format_vtab,
19,702✔
603
                                      env, keys, key_count, space_def);
604
}
605

606
static struct space *
607
vinyl_engine_create_space(struct engine *engine, struct space_def *def,
19,702✔
608
                          struct rlist *key_list)
609
{
610
        struct vy_env *env = vy_env(engine);
19,702✔
611
        struct space *space = malloc(sizeof(*space));
19,702✔
612
        if (space == NULL) {
19,702!
613
                diag_set(OutOfMemory, sizeof(*space),
×
614
                         "malloc", "struct space");
615
                return NULL;
×
616
        }
617

618
        /* Create a format from key and field definitions. */
619
        int key_count = 0;
19,702✔
620
        struct index_def *index_def;
621
        rlist_foreach_entry(index_def, key_list, link)
32,975✔
622
                key_count++;
13,273✔
623
        struct key_def **keys = NULL;
19,702✔
624
        size_t region_svp = region_used(&fiber()->gc);
19,702!
625
        if (key_count > 0)
19,702✔
626
                keys = xregion_alloc_array(&fiber()->gc, typeof(keys[0]),
10,140!
627
                                           key_count);
628
        key_count = 0;
19,702✔
629
        rlist_foreach_entry(index_def, key_list, link)
32,975✔
630
                keys[key_count++] = index_def->key_def;
13,273✔
631

632
        struct tuple_format *format;
633
        format = vy_space_stmt_format_new(&env->stmt_env, keys, key_count, def);
19,702✔
634
        region_truncate(&fiber()->gc, region_svp);
19,702!
635
        if (format == NULL) {
19,702✔
636
                free(space);
9✔
637
                return NULL;
9✔
638
        }
639
        tuple_format_ref(format);
19,693✔
640

641
        if (space_create(space, engine, &vinyl_space_vtab,
19,693✔
642
                         def, key_list, format) != 0) {
643
                tuple_format_unref(format);
13✔
644
                free(space);
13✔
645
                return NULL;
13✔
646
        }
647

648
        /* Format is now referenced by the space. */
649
        tuple_format_unref(format);
19,680✔
650
        return space;
19,680✔
651
}
652

653
static void
654
vinyl_space_destroy(struct space *space)
18,223✔
655
{
656
        TRASH(space);
18,223✔
657
        free(space);
18,223✔
658
}
18,223✔
659

660
static int
661
vinyl_space_check_index_def(struct space *space, struct index_def *index_def)
6,916✔
662
{
663
        if (index_def->type != TREE) {
6,916✔
664
                diag_set(ClientError, ER_INDEX_TYPE,
4!
665
                         index_def->name, space_name(space));
666
                return -1;
4✔
667
        }
668
        if (index_def->opts.hint == INDEX_HINT_ON &&
6,912✔
669
            recovery_state == FINISHED_RECOVERY) {
4✔
670
                /*
671
                 * The error is silenced during recovery to be able to recover
672
                 * the indexes with incorrect hint options.
673
                 */
674
                diag_set(ClientError, ER_MODIFY_INDEX, index_def->name,
3!
675
                         space_name(space),
676
                         "hint is only reasonable with memtx tree index");
677
                return -1;
3✔
678
        }
679

680
        struct key_def *key_def = index_def->key_def;
6,909✔
681

682
        if (key_def->is_nullable && index_def->iid == 0) {
6,909✔
683
                diag_set(ClientError, ER_NULLABLE_PRIMARY, space_name(space));
4!
684
                return -1;
4✔
685
        }
686
        /* Check that there are no ANY, ARRAY, MAP parts */
687
        for (uint32_t i = 0; i < key_def->part_count; i++) {
14,801✔
688
                struct key_part *part = &key_def->parts[i];
7,903✔
689
                if (part->type <= FIELD_TYPE_ANY ||
7,903✔
690
                    part->type >= FIELD_TYPE_ARRAY) {
7,900✔
691
                        diag_set(ClientError, ER_MODIFY_INDEX,
7!
692
                                 index_def->name, space_name(space),
693
                                 tt_sprintf("field type '%s' is not supported",
694
                                            field_type_strs[part->type]));
695
                        return -1;
7✔
696
                }
697
        }
698
        if (key_def->for_func_index) {
6,898✔
699
                diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
1!
700
                         "functional index");
701
                return -1;
1✔
702
        }
703
        return 0;
6,897✔
704
}
705

706
static struct index *
707
vinyl_space_create_index(struct space *space, struct index_def *index_def)
13,255✔
708
{
709
        assert(index_def->type == TREE);
13,255!
710
        struct vy_env *env = vy_env(space->engine);
13,255✔
711
        struct vy_lsm *pk = NULL;
13,255✔
712
        if (index_def->iid > 0) {
13,255✔
713
                pk = vy_lsm(space_index(space, 0));
3,124✔
714
                assert(pk != NULL);
3,124!
715
        }
716
        struct vy_lsm *lsm = vy_lsm_new(&env->lsm_env, &env->cache_env,
13,255✔
717
                                        &env->mem_env, index_def, space->format,
718
                                        pk, space_group_id(space));
719
        if (lsm == NULL)
13,255!
720
                return NULL;
×
721

722
        index_create(&lsm->base, &env->base, &vinyl_index_vtab, index_def);
13,255✔
723
        return &lsm->base;
13,255✔
724
}
725

726
static void
727
vinyl_index_destroy(struct index *index)
11,352✔
728
{
729
        struct vy_lsm *lsm = vy_lsm(index);
11,352✔
730
        vy_lsm_delete(lsm);
11,352✔
731
}
11,352✔
732

733
/**
734
 * Detect whether we already have non-garbage index files,
735
 * and open an existing index if that's the case. Otherwise,
736
 * create a new index. Take the current recovery status into
737
 * account.
738
 */
739
static int
740
vinyl_index_open(struct index *index)
9,061✔
741
{
742
        struct vy_env *env = vy_env(index->engine);
9,061✔
743
        struct vy_lsm *lsm = vy_lsm(index);
9,061✔
744

745
        /* Ensure vinyl data directory exists. */
746
        if (access(env->path, F_OK) != 0) {
9,061✔
747
                diag_set(SystemError, "can not access vinyl data directory");
1!
748
                return -1;
1✔
749
        }
750
        switch (env->status) {
9,060!
751
        case VINYL_ONLINE:
8,495✔
752
                /*
753
                 * The recovery is complete, simply
754
                 * create a new index.
755
                 */
756
                if (vy_lsm_create(lsm) != 0)
8,495!
757
                        return -1;
×
758
                /* Make sure reader threads are up and running. */
759
                vy_run_env_enable_coio(&env->run_env);
8,495✔
760
                break;
8,495✔
761
        case VINYL_INITIAL_RECOVERY_REMOTE:
119✔
762
        case VINYL_FINAL_RECOVERY_REMOTE:
763
                /*
764
                 * Remote recovery. The index files do not
765
                 * exist locally, and we should create the
766
                 * index directory from scratch.
767
                 */
768
                if (vy_lsm_create(lsm) != 0)
119!
769
                        return -1;
×
770
                break;
119✔
771
        case VINYL_INITIAL_RECOVERY_LOCAL:
445✔
772
        case VINYL_FINAL_RECOVERY_LOCAL:
773
                /*
774
                 * Local WAL replay or recovery from snapshot.
775
                 * In either case the index directory should
776
                 * have already been created, so try to load
777
                 * the index files from it.
778
                 */
779
                if (vy_lsm_recover(lsm, env->recovery, &env->run_env,
445✔
780
                                   vclock_sum(env->recovery_vclock),
781
                                   env->status == VINYL_INITIAL_RECOVERY_LOCAL,
445✔
782
                                   env->force_recovery) != 0)
445✔
783
                        return -1;
1✔
784
                break;
444✔
785
        case VINYL_HOT_STANDBY:
1✔
786
                /* See the comment to vinyl_engine_begin_hot_standby(). */
787
                diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
1!
788
                         "hot standby mode");
789
                return -1;
1✔
790
        default:
×
791
                unreachable();
×
792
        }
793
        /*
794
         * Add the new LSM tree to the scheduler so that it can
795
         * be dumped and compacted.
796
         */
797
        return vy_scheduler_add_lsm(&env->scheduler, lsm);
9,058✔
798
}
799

800
static void
801
vinyl_index_commit_create(struct index *index, int64_t lsn)
8,958✔
802
{
803
        struct vy_env *env = vy_env(index->engine);
8,958✔
804
        struct vy_lsm *lsm = vy_lsm(index);
8,958✔
805

806
        assert(lsm->id >= 0);
8,958!
807

808
        if (env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
8,958✔
809
            env->status == VINYL_FINAL_RECOVERY_LOCAL) {
8,739✔
810
                /*
811
                 * Normally, if this is local recovery, the index
812
                 * should have been logged before restart. There's
813
                 * one exception though - we could've failed to log
814
                 * index due to a vylog write error, in which case
815
                 * the index isn't in the recovery context and we
816
                 * need to retry to log it now.
817
                 */
818
                if (lsm->commit_lsn >= 0)
444✔
819
                        return;
441✔
820
        }
821

822
        /*
823
         * Backward compatibility fixup: historically, we used
824
         * box.info.signature for LSN of index creation, which
825
         * lags behind the LSN of the record that created the
826
         * index by 1. So for legacy indexes use the LSN from
827
         * index options.
828
         */
829
        if (lsm->opts.lsn != 0)
8,517!
830
                lsn = lsm->opts.lsn;
×
831

832
        assert(lsm->commit_lsn < 0);
8,517!
833
        lsm->commit_lsn = lsn;
8,517✔
834

835
        /*
836
         * Since it's too late to fail now, in case of vylog write
837
         * failure we leave the records we attempted to write in
838
         * the log buffer so that they are flushed along with the
839
         * next write request. If they don't get flushed before
840
         * the instance is shut down, we will replay them on local
841
         * recovery.
842
         */
843
        vy_log_tx_begin();
8,517✔
844
        vy_log_create_lsm(lsm->id, lsn);
8,517✔
845
        vy_log_tx_try_commit();
8,517✔
846
}
847

848
static void
849
vinyl_index_abort_create(struct index *index)
103✔
850
{
851
        struct vy_env *env = vy_env(index->engine);
103✔
852
        struct vy_lsm *lsm = vy_lsm(index);
103✔
853

854
        if (env->status != VINYL_ONLINE) {
103✔
855
                /* Failure during recovery. Nothing to do. */
856
                return;
2✔
857
        }
858
        if (lsm->id < 0) {
101✔
859
                /*
860
                 * ALTER failed before we wrote information about
861
                 * the new LSM tree to vylog, see vy_lsm_create().
862
                 * Nothing to do.
863
                 */
864
                return;
2✔
865
        }
866

867
        lsm->is_dropped = true;
99✔
868

869
        vy_log_tx_begin();
99✔
870
        vy_log_drop_lsm(lsm->id, 0);
99✔
871
        vy_log_tx_try_commit();
99✔
872
}
873

874
static void
875
vinyl_index_commit_modify(struct index *index, int64_t lsn)
95✔
876
{
877
        struct vy_env *env = vy_env(index->engine);
95✔
878
        struct vy_lsm *lsm = vy_lsm(index);
95✔
879

880
        (void)env;
881
        assert(env->status == VINYL_ONLINE ||
95!
882
               env->status == VINYL_FINAL_RECOVERY_LOCAL ||
883
               env->status == VINYL_FINAL_RECOVERY_REMOTE);
884

885
        if (env->status == VINYL_FINAL_RECOVERY_LOCAL &&
95✔
886
            lsn <= lsm->commit_lsn) {
7!
887
                /*
888
                 * The statement we are recovering from WAL has
889
                 * been successfully written to vylog so we must
890
                 * not replay it.
891
                 */
892
                return;
7✔
893
        }
894

895
        assert(lsm->commit_lsn <= lsn);
88!
896
        lsm->commit_lsn = lsn;
88✔
897

898
        vy_log_tx_begin();
88✔
899
        vy_log_modify_lsm(lsm->id, lsm->key_def, lsn);
88✔
900
        vy_log_tx_try_commit();
88✔
901
}
902

903
static void
904
vinyl_index_commit_drop(struct index *index, int64_t lsn)
7,062✔
905
{
906
        struct vy_env *env = vy_env(index->engine);
7,062✔
907
        struct vy_lsm *lsm = vy_lsm(index);
7,062✔
908

909
        /*
910
         * We can't abort here, because the index drop request has
911
         * already been written to WAL. So if we fail to write the
912
         * change to the metadata log, we leave it in the log buffer,
913
         * to be flushed along with the next transaction. If it is
914
         * not flushed before the instance is shut down, we replay it
915
         * on local recovery from WAL.
916
         */
917
        if (env->status == VINYL_FINAL_RECOVERY_LOCAL && lsm->is_dropped)
7,062✔
918
                return;
105✔
919

920
        lsm->is_dropped = true;
6,957✔
921

922
        vy_log_tx_begin();
6,957✔
923
        vy_log_drop_lsm(lsm->id, lsn);
6,957✔
924
        vy_log_tx_try_commit();
6,957✔
925
}
926

927
static void
928
vinyl_index_update_def(struct index *index)
95✔
929
{
930
        struct vy_lsm *lsm = vy_lsm(index);
95✔
931
        lsm->opts = index->def->opts;
95✔
932
        /*
933
         * Sic: We copy key definitions in-place instead of reallocating them
934
         * because they may be used by read iterators by pointer, for example,
935
         * see vy_run_iterator.
936
         */
937
        key_def_copy(lsm->key_def, index->def->key_def);
95✔
938
        key_def_copy(lsm->cmp_def, index->def->cmp_def);
95✔
939
}
95✔
940

941
static bool
942
vinyl_index_depends_on_pk(struct index *index)
73✔
943
{
944
        (void)index;
945
        /*
946
         * All secondary Vinyl indexes are non-clustered and hence
947
         * have to be updated if the primary key is modified.
948
         */
949
        return true;
73✔
950
}
951

952
static bool
953
vinyl_index_def_change_requires_rebuild(struct index *index,
70✔
954
                                        const struct index_def *new_def)
955
{
956
        struct index_def *old_def = index->def;
70✔
957

958
        assert(old_def->iid == new_def->iid);
70!
959
        assert(old_def->space_id == new_def->space_id);
70!
960
        assert(old_def->type == TREE && new_def->type == TREE);
70!
961

962
        if (!old_def->opts.is_unique && new_def->opts.is_unique)
70✔
963
                return true;
2✔
964
        if (old_def->opts.func_id != new_def->opts.func_id)
68!
965
                return true;
×
966

967
        assert(index_depends_on_pk(index));
68!
968
        const struct key_def *old_key_def = old_def->key_def;
68✔
969
        const struct key_def *old_cmp_def = old_def->cmp_def;
68✔
970
        const struct key_def *new_key_def = new_def->key_def;
68✔
971
        const struct key_def *new_cmp_def = new_def->cmp_def;
68✔
972

973
        /*
974
         * It is not enough to check only fieldno in case of Vinyl,
975
         * because the index may store some overwritten or deleted
976
         * statements conforming to the old format. CheckSpaceFormat
977
         * won't reveal such statements, but we may still need to
978
         * compare them to statements inserted after ALTER hence
979
         * we can't narrow field types without index rebuild.
980
         *
981
         * Sic: If secondary index key parts are extended with primary
982
         * index key parts, cmp_def (hence the sorting order) will stay
983
         * the same, but we still have to rebuild the index because
984
         * the new key_def has more parts so we can't update it in-place,
985
         * see vinyl_index_update_def().
986
         */
987
        if (old_key_def->part_count != new_key_def->part_count ||
68✔
988
            old_cmp_def->part_count != new_cmp_def->part_count)
61✔
989
                return true;
8✔
990

991
        for (uint32_t i = 0; i < new_cmp_def->part_count; i++) {
117✔
992
                const struct key_part *old_part = &old_cmp_def->parts[i];
83✔
993
                const struct key_part *new_part = &new_cmp_def->parts[i];
83✔
994
                if (old_part->fieldno != new_part->fieldno)
83✔
995
                        return true;
6✔
996
                if (old_part->coll != new_part->coll)
77!
997
                        return true;
×
998
                if (key_part_is_nullable(old_part) &&
77✔
999
                    !key_part_is_nullable(new_part))
12✔
1000
                        return true;
9✔
1001
                if (!field_type1_contains_type2(new_part->type, old_part->type))
68✔
1002
                        return true;
7✔
1003
                if (json_path_cmp(old_part->path, old_part->path_len,
61✔
1004
                                  new_part->path, new_part->path_len,
61✔
1005
                                  TUPLE_INDEX_BASE) != 0)
1006
                        return true;
2✔
1007
                if (old_part->exclude_null != new_part->exclude_null)
59✔
1008
                        return true;
2✔
1009
        }
1010
        assert(old_cmp_def->is_multikey == new_cmp_def->is_multikey);
34!
1011
        return false;
34✔
1012
}
1013

1014
static int
1015
vinyl_space_prepare_alter(struct space *old_space, struct space *new_space)
14,094✔
1016
{
1017
        (void)new_space;
1018
        struct vy_env *env = vy_env(old_space->engine);
14,094✔
1019

1020
        if (vinyl_check_wal(env, "DDL") != 0)
14,094✔
1021
                return -1;
4✔
1022

1023
        return 0;
14,090✔
1024
}
1025

1026
static void
1027
vinyl_space_invalidate(struct space *space)
18,226✔
1028
{
1029
        struct vy_env *env = vy_env(space->engine);
18,226!
1030
        /*
1031
         * Abort all transactions involving the invalidated space.
1032
         * An aborted transaction doesn't allow any DML/DQL requests
1033
         * so the space won't be used anymore and can be safely
1034
         * destroyed.
1035
         *
1036
         * There's a subtle corner case though - a transaction can
1037
         * be reading disk from a DML request right now, with this
1038
         * space passed to it in the argument list. However, it's
1039
         * handled as well: the iterator will return an error as
1040
         * soon as it's done reading disk, which will make the DML
1041
         * request bail out early, without dereferencing the space.
1042
         */
1043
        bool unused;
1044
        vy_tx_manager_abort_writers_for_ddl(env->xm, space, &unused);
18,226!
1045
}
18,226✔
1046

1047
/** Argument passed to vy_check_format_on_replace(). */
1048
struct vy_check_format_ctx {
1049
        /** Format to check new tuples against. */
1050
        struct tuple_format *format;
1051
        /** Set if a new tuple doesn't conform to the format. */
1052
        bool is_failed;
1053
        /** Container for storing errors. */
1054
        struct diag diag;
1055
};
1056

1057
/**
1058
 * This is an on_replace trigger callback that checks inserted
1059
 * tuples against a new format.
1060
 */
1061
static int
1062
vy_check_format_on_replace(struct trigger *trigger, void *event)
98✔
1063
{
1064
        struct txn *txn = event;
98✔
1065
        struct txn_stmt *stmt = txn_current_stmt(txn);
98✔
1066
        struct vy_check_format_ctx *ctx = trigger->data;
98✔
1067

1068
        if (stmt->new_tuple == NULL)
98!
1069
                return 0; /* DELETE, nothing to do */
×
1070

1071
        if (ctx->is_failed)
98✔
1072
                return 0; /* already failed, nothing to do */
97✔
1073

1074
        if (tuple_validate(ctx->format, stmt->new_tuple) != 0) {
1!
1075
                ctx->is_failed = true;
1✔
1076
                diag_move(diag_get(), &ctx->diag);
1✔
1077
        }
1078
        return 0;
1✔
1079
}
1080

1081
static int
1082
vinyl_space_check_format(struct space *space, struct tuple_format *format)
219✔
1083
{
1084
        struct vy_env *env = vy_env(space->engine);
219!
1085
        struct txn *txn = in_txn();
219!
1086

1087
        /*
1088
         * If this is local recovery, the space was checked before
1089
         * restart so there's nothing we need to do.
1090
         */
1091
        if (env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
219!
1092
            env->status == VINYL_FINAL_RECOVERY_LOCAL)
219✔
1093
                return 0;
219✔
1094

1095
        if (space->index_count == 0)
204✔
1096
                return 0; /* space is empty, nothing to do */
31✔
1097

1098
        /*
1099
         * Iterate over all tuples stored in the given space and
1100
         * check each of them for conformity to the new format.
1101
         * Since read iterator may yield, we install an on_replace
1102
         * trigger to check tuples inserted after we started the
1103
         * iteration.
1104
         */
1105
        struct vy_lsm *pk = vy_lsm(space->index[0]);
173!
1106

1107
        /*
1108
         * Transactions started before the space alter request can't
1109
         * be checked with on_replace trigger so we abort them.
1110
         */
1111
        bool need_wal_sync;
1112
        vy_tx_manager_abort_writers_for_ddl(env->xm, space, &need_wal_sync);
173!
1113

1114
        if (!need_wal_sync && vy_lsm_is_empty(pk))
173!
1115
                return 0; /* space is empty, nothing to do */
78✔
1116

1117
        if (txn_check_singlestatement(txn, "space format check") != 0)
95!
1118
                return -1;
2✔
1119

1120
        struct trigger on_replace;
1121
        struct vy_check_format_ctx ctx;
1122
        ctx.format = format;
93✔
1123
        ctx.is_failed = false;
93✔
1124
        diag_create(&ctx.diag);
93!
1125
        trigger_create(&on_replace, vy_check_format_on_replace, &ctx, NULL);
93!
1126
        trigger_add(&space->on_replace, &on_replace);
93!
1127

1128
        /*
1129
         * Flush transactions waiting on WAL after installing on_replace
1130
         * trigger so that changes made by newer transactions are checked
1131
         * by the trigger callback.
1132
         */
1133
        int rc;
1134
        if (need_wal_sync) {
93✔
1135
                rc = wal_sync(NULL);
3!
1136
                if (rc != 0)
3✔
1137
                        goto out;
1✔
1138
        }
1139

1140
        struct vy_read_iterator itr;
1141
        vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, pk->env->empty_key,
92✔
1142
                              &env->xm->p_committed_read_view);
92!
1143
        int loops = 0;
92✔
1144
        struct vy_entry entry;
1145
        while ((rc = vy_read_iterator_next(&itr, &entry)) == 0) {
20,222!
1146
                /*
1147
                 * Read iterator yields only when it reads runs.
1148
                 * Yield periodically in order not to stall the
1149
                 * tx thread in case there are a lot of tuples in
1150
                 * mems or cache.
1151
                 */
1152
                if (++loops % VY_YIELD_LOOPS == 0)
20,222✔
1153
                        fiber_sleep(0);
10,070✔
1154
                ERROR_INJECT_YIELD(ERRINJ_CHECK_FORMAT_DELAY);
20,233!
1155
                if (ctx.is_failed) {
20,222✔
1156
                        diag_move(&ctx.diag, diag_get());
1!
1157
                        rc = -1;
1✔
1158
                        break;
1✔
1159
                }
1160
                if (entry.stmt == NULL)
20,221✔
1161
                        break;
42✔
1162
                rc = tuple_validate(format, entry.stmt);
20,179!
1163
                if (rc != 0)
20,179✔
1164
                        break;
49✔
1165
        }
1166
        vy_read_iterator_close(&itr);
92!
1167
out:
93✔
1168
        diag_destroy(&ctx.diag);
93!
1169
        trigger_clear(&on_replace);
93!
1170
        return rc;
93✔
1171
}
1172

1173
static void
1174
vinyl_space_swap_index(struct space *old_space, struct space *new_space,
4,121✔
1175
                       uint32_t old_index_id, uint32_t new_index_id)
1176
{
1177
        struct vy_lsm *old_lsm = vy_lsm(old_space->index_map[old_index_id]);
4,121✔
1178
        struct vy_lsm *new_lsm = vy_lsm(new_space->index_map[new_index_id]);
4,121✔
1179

1180
        /*
1181
         * Swap the two indexes between the two spaces,
1182
         * but leave tuple formats.
1183
         */
1184
        generic_space_swap_index(old_space, new_space,
4,121✔
1185
                                 old_index_id, new_index_id);
1186

1187
        SWAP(old_lsm, new_lsm);
4,121✔
1188
        SWAP(old_lsm->mem_format, new_lsm->mem_format);
4,121✔
1189
        SWAP(old_lsm->disk_format, new_lsm->disk_format);
4,121✔
1190

1191
        /* Update pointer to the primary key. */
1192
        vy_lsm_update_pk(old_lsm, vy_lsm(old_space->index_map[0]));
4,121✔
1193
        vy_lsm_update_pk(new_lsm, vy_lsm(new_space->index_map[0]));
4,121✔
1194
}
4,121✔
1195

1196
static int
1197
vinyl_space_add_primary_key(struct space *space)
7,290✔
1198
{
1199
        return vinyl_index_open(space->index[0]);
7,290✔
1200
}
1201

1202
static size_t
1203
vinyl_space_bsize(struct space *space)
153,301✔
1204
{
1205
        /*
1206
         * Return the sum size of user data this space
1207
         * accommodates. Since full tuples are stored in
1208
         * primary indexes, it is basically the size of
1209
         * binary data stored in this space's primary index.
1210
         */
1211
        struct index *pk = space_index(space, 0);
153,301✔
1212
        if (pk == NULL)
153,301✔
1213
                return 0;
1✔
1214
        struct vy_lsm *lsm = vy_lsm(pk);
153,300✔
1215
        return lsm->stat.memory.count.bytes + lsm->stat.disk.count.bytes;
153,300✔
1216
}
1217

1218
static ssize_t
1219
vinyl_index_size(struct index *index)
256,784✔
1220
{
1221
        /*
1222
         * Return the total number of statements in the LSM tree.
1223
         * Note, it may be greater than the number of tuples actually
1224
         * stored in the space, but it should be a fairly good estimate.
1225
         */
1226
        struct vy_lsm *lsm = vy_lsm(index);
256,784✔
1227
        return lsm->stat.memory.count.rows + lsm->stat.disk.count.rows;
256,784✔
1228
}
1229

1230
static ssize_t
1231
vinyl_index_bsize(struct index *index)
298✔
1232
{
1233
        /*
1234
         * Return the cost of indexing user data. For both
1235
         * primary and secondary indexes, this includes the
1236
         * size of page index, bloom filter, and memory tree
1237
         * extents. For secondary indexes, we also add the
1238
         * total size of statements stored on disk, because
1239
         * they are only needed for building the index.
1240
         */
1241
        struct vy_lsm *lsm = vy_lsm(index);
298✔
1242
        ssize_t bsize = vy_lsm_mem_tree_size(lsm) +
298✔
1243
                lsm->page_index_size + lsm->bloom_size;
298✔
1244
        if (lsm->index_id > 0)
298✔
1245
                bsize += lsm->stat.disk.count.bytes;
177✔
1246
        return bsize;
298✔
1247
}
1248

1249
static void
1250
vinyl_index_compact(struct index *index)
59✔
1251
{
1252
        struct vy_lsm *lsm = vy_lsm(index);
59✔
1253
        struct vy_env *env = vy_env(index->engine);
59✔
1254
        vy_scheduler_force_compaction(&env->scheduler, lsm);
59✔
1255
}
59✔
1256

1257
/* {{{ Public API of transaction control: start/end transaction,
1258
 * read, write data in the context of a transaction.
1259
 */
1260

1261
/**
1262
 * Check if a request has already been committed to an LSM tree.
1263
 *
1264
 * If we're recovering the WAL, it may happen so that this
1265
 * particular run was dumped after the checkpoint, and we're
1266
 * replaying records already present in the database. In this
1267
 * case avoid overwriting a newer version with an older one.
1268
 *
1269
 * If the LSM tree is going to be dropped or truncated on WAL
1270
 * recovery, there's no point in replaying statements for it,
1271
 * either.
1272
 *
1273
 * Note, although we may skip secondary index update in certain
1274
 * cases (e.g.  UPDATE that doesn't touch secondary index parts
1275
 * or DELETE for which generation of secondary index statement
1276
 * is deferred), a DML request of any kind always updates the
1277
 * primary index. Also, we always dump the primary index after
1278
 * secondary indexes. So we may skip recovery of a DML request
1279
 * if it has been dumped for the primary index.
1280
 */
1281
static inline bool
1282
vy_is_committed(struct vy_env *env, struct vy_lsm *lsm)
1,799,030✔
1283
{
1284
        if (likely(env->status != VINYL_FINAL_RECOVERY_LOCAL))
1,799,030✔
1285
                return false;
1,688,860✔
1286
        if (lsm->is_dropped)
110,163✔
1287
                return true;
2,071✔
1288
        if (vclock_sum(env->recovery_vclock) <= lsm->dump_lsn)
108,092✔
1289
                return true;
56,984✔
1290
        return false;
51,108✔
1291
}
1292

1293
/**
1294
 * Get a full tuple by a tuple read from a secondary index.
1295
 * @param lsm         LSM tree from which the tuple was read.
1296
 * @param tx          Current transaction.
1297
 * @param rv          Read view.
1298
 * @param entry       Tuple read from a secondary index.
1299
 * @param[out] result The found tuple is stored here. Must be
1300
 *                    unreferenced after usage.
1301
 *
1302
 * @param  0 Success.
1303
 * @param -1 Memory error or read error.
1304
 */
1305
static int
1306
vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
1,265,260✔
1307
                          const struct vy_read_view **rv,
1308
                          struct vy_entry entry, struct vy_entry *result)
1309
{
1310
        int rc = 0;
1,265,260✔
1311
        assert(lsm->index_id > 0);
1,265,260!
1312

1313
        /*
1314
         * Lookup the full tuple by a secondary statement.
1315
         * There are two cases: the secondary statement may be
1316
         * a key, if we got this tuple from disk, in which case
1317
         * we need to extract the primary key parts from it; or
1318
         * it may be a full tuple, if we got this tuple from
1319
         * the tuple cache or level 0, in which case we may pass
1320
         * it immediately to the iterator.
1321
         */
1322
        struct vy_entry key;
1323
        if (vy_stmt_is_key(entry.stmt)) {
1,265,260!
1324
                key.stmt = vy_stmt_extract_key(entry.stmt, lsm->pk_in_cmp_def,
1,479,440✔
1325
                                               lsm->env->key_format,
739,719!
1326
                                               MULTIKEY_NONE);
1327
                if (key.stmt == NULL)
739,719!
1328
                        return -1;
1,265,260✔
1329
        } else {
1330
                key.stmt = entry.stmt;
525,546✔
1331
                tuple_ref(key.stmt);
525,546!
1332
        }
1333
        key.hint = vy_stmt_hint(key.stmt, lsm->pk->cmp_def);
1,265,260!
1334

1335
        lsm->pk->stat.lookup++;
1,265,260✔
1336

1337
        struct vy_entry pk_entry;
1338
        if (vy_point_lookup(lsm->pk, tx, rv, key, &pk_entry) != 0) {
1,265,260!
1339
                rc = -1;
1✔
1340
                goto out;
1✔
1341
        }
1342

1343
        bool match = false;
1,265,260✔
1344
        struct vy_entry full_entry;
1345
        if (pk_entry.stmt != NULL) {
1,265,260✔
1346
                vy_stmt_foreach_entry(full_entry, pk_entry.stmt, lsm->cmp_def) {
3,280,260!
1347
                        if (vy_entry_compare(full_entry, entry,
3,280,120!
1348
                                             lsm->cmp_def) == 0) {
1349
                                match = true;
1,265,000✔
1350
                                break;
1,265,000✔
1351
                        }
1352
                }
1353
        }
1354
        if (!match) {
1,265,260✔
1355
                /*
1356
                 * If a tuple read from a secondary index doesn't
1357
                 * match the tuple corresponding to it in the
1358
                 * primary index, it must have been overwritten or
1359
                 * deleted, but the DELETE statement hasn't been
1360
                 * propagated to the secondary index yet. In this
1361
                 * case silently skip this tuple.
1362
                 */
1363
                vy_stmt_counter_acct_tuple(&lsm->stat.skip, entry.stmt);
259!
1364
                if (pk_entry.stmt != NULL) {
259✔
1365
                        vy_stmt_counter_acct_tuple(&lsm->pk->stat.skip,
139!
1366
                                                   pk_entry.stmt);
1367
                        tuple_unref(pk_entry.stmt);
139!
1368
                }
1369
                /*
1370
                 * We must purge stale tuples from the cache before
1371
                 * storing the resulting interval in order to avoid
1372
                 * chain intersections, which are not tolerated by
1373
                 * the tuple cache implementation.
1374
                 */
1375
                vy_cache_on_write(&lsm->cache, entry, NULL);
259!
1376
                *result = vy_entry_none();
259!
1377
                goto out;
259✔
1378
        }
1379

1380
        /*
1381
         * Even though the tuple is tracked in the secondary index
1382
         * read set, we still must track the full tuple read from
1383
         * the primary index, otherwise the transaction won't be
1384
         * aborted if this tuple is overwritten or deleted, because
1385
         * the DELETE statement is not written to secondary indexes
1386
         * immediately.
1387
         */
1388
        if (tx != NULL && vy_tx_track_point(tx, lsm->pk, pk_entry) != 0) {
1,265,000!
1389
                tuple_unref(pk_entry.stmt);
×
1390
                rc = -1;
×
1391
                goto out;
×
1392
        }
1393

1394
        if ((*rv)->vlsn == INT64_MAX)
1,265,000✔
1395
                vy_cache_add_point(&lsm->pk->cache, pk_entry, key);
1,243,050!
1396

1397
        vy_stmt_counter_acct_tuple(&lsm->pk->stat.get, pk_entry.stmt);
1,265,000!
1398
        *result = full_entry;
1,265,000✔
1399
out:
1,265,260✔
1400
        tuple_unref(key.stmt);
1,265,260!
1401
        return rc;
1,265,260✔
1402
}
1403

1404
/**
1405
 * Get a tuple from a vinyl space by key.
1406
 * @param lsm         LSM tree in which search.
1407
 * @param tx          Current transaction.
1408
 * @param rv          Read view.
1409
 * @param key_stmt    Key statement.
1410
 * @param[out] result The found tuple is stored here. Must be
1411
 *                    unreferenced after usage.
1412
 *
1413
 * @param  0 Success.
1414
 * @param -1 Memory error or read error.
1415
 */
1416
static int
1417
vy_get(struct vy_lsm *lsm, struct vy_tx *tx,
1,078,780✔
1418
       const struct vy_read_view **rv,
1419
       struct tuple *key_stmt, struct tuple **result)
1420
{
1421
        double start_time = ev_monotonic_now(loop());
1,078,780!
1422
        /*
1423
         * tx can be NULL, for example, if an user calls
1424
         * space.index.get({key}).
1425
         */
1426
        assert(tx == NULL || tx->state == VINYL_TX_READY);
1,078,780!
1427
        /*
1428
         * Make sure the LSM tree isn't deleted while we are
1429
         * reading from it.
1430
         */
1431
        vy_lsm_ref(lsm);
1,078,780!
1432

1433
        int rc;
1434
        struct vy_entry partial, entry;
1435

1436
        struct vy_entry key;
1437
        key.stmt = key_stmt;
1,078,780✔
1438
        key.hint = vy_stmt_hint(key.stmt, lsm->cmp_def);
1,078,780!
1439

1440
        lsm->stat.lookup++;
1,078,780✔
1441

1442
        if (vy_stmt_is_full_key(key.stmt, lsm->cmp_def)) {
1,078,780!
1443
                /*
1444
                 * Use point lookup for a full key.
1445
                 */
1446
                if (tx != NULL && vy_tx_track_point(tx, lsm, key) != 0)
1,060,140!
1447
                        goto fail;
×
1448
                if (vy_point_lookup(lsm, tx, rv, key, &partial) != 0)
1,060,140!
1449
                        goto fail;
12✔
1450
                if (lsm->index_id > 0 && partial.stmt != NULL) {
1,060,130!
1451
                        rc = vy_get_by_secondary_tuple(lsm, tx, rv,
7!
1452
                                                       partial, &entry);
1453
                        tuple_unref(partial.stmt);
7!
1454
                        if (rc != 0)
7!
1455
                                goto fail;
×
1456
                } else {
1457
                        entry = partial;
1,060,120✔
1458
                }
1459
                if ((*rv)->vlsn == INT64_MAX)
1,060,130✔
1460
                        vy_cache_add_point(&lsm->cache, entry, key);
1,060,070!
1461
                goto out;
1,060,130✔
1462
        }
1463

1464
        struct vy_read_iterator itr;
1465
        vy_read_iterator_open(&itr, lsm, tx, ITER_EQ, key, rv);
18,639!
1466
        while ((rc = vy_read_iterator_next(&itr, &partial)) == 0) {
18,639!
1467
                if (lsm->index_id == 0 || partial.stmt == NULL) {
18,637!
1468
                        entry = partial;
10,468✔
1469
                        if (entry.stmt != NULL)
10,468!
1470
                                tuple_ref(entry.stmt);
×
1471
                        break;
10,468✔
1472
                }
1473
                rc = vy_get_by_secondary_tuple(lsm, tx, rv, partial, &entry);
8,169!
1474
                if (rc != 0 || entry.stmt != NULL)
8,169!
1475
                        break;
1476
        }
1477
        if (rc == 0)
18,639✔
1478
                vy_read_iterator_cache_add(&itr, entry);
18,636!
1479
        vy_read_iterator_close(&itr);
18,639!
1480
        if (rc != 0)
18,639✔
1481
                goto fail;
3✔
1482
out:
18,636✔
1483
        *result = entry.stmt;
1,078,770✔
1484

1485
        double latency = ev_monotonic_now(loop()) - start_time;
1,078,770!
1486
        latency_collect(&lsm->stat.latency, latency);
1,078,770!
1487

1488
        if (latency > lsm->env->too_long_threshold) {
1,078,770✔
1489
                say_warn_ratelimited("%s: get(%s) => %s "
2!
1490
                                     "took too long: %.3f sec",
1491
                                     vy_lsm_name(lsm), tuple_str(key.stmt),
1492
                                     tuple_str(*result), latency);
1493
        }
1494
        if (*result != NULL)
1,078,770✔
1495
                vy_stmt_counter_acct_tuple(&lsm->stat.get, *result);
88,524!
1496
        vy_lsm_unref(lsm);
1,078,770!
1497
        return 0;
1,078,780✔
1498
fail:
15✔
1499
        vy_lsm_unref(lsm);
15!
1500
        return -1;
15✔
1501
}
1502

1503
/**
1504
 * Get a tuple from a vinyl space by raw key.
1505
 * @param lsm         LSM tree in which search.
1506
 * @param tx          Current transaction.
1507
 * @param rv          Read view.
1508
 * @param key_raw     MsgPack array of key fields.
1509
 * @param part_count  Count of parts in the key.
1510
 * @param[out] result The found tuple is stored here. Must be
1511
 *                    unreferenced after usage.
1512
 *
1513
 * @param  0 Success.
1514
 * @param -1 Memory error or read error.
1515
 */
1516
static int
1517
vy_get_by_raw_key(struct vy_lsm *lsm, struct vy_tx *tx,
106,682✔
1518
                  const struct vy_read_view **rv,
1519
                  const char *key_raw, uint32_t part_count,
1520
                  struct tuple **result)
1521
{
1522
        struct tuple *key = vy_key_new(lsm->env->key_format,
106,682✔
1523
                                       key_raw, part_count);
1524
        if (key == NULL)
106,682!
1525
                return -1;
×
1526
        int rc = vy_get(lsm, tx, rv, key, result);
106,682✔
1527
        tuple_unref(key);
106,681✔
1528
        return rc;
106,681✔
1529
}
1530

1531
/**
1532
 * Check if insertion of a new tuple violates unique constraint
1533
 * of the primary index.
1534
 * @param tx         Current transaction.
1535
 * @param rv         Read view.
1536
 * @param lsm        LSM tree corresponding to the index.
1537
 * @param stmt       New tuple.
1538
 *
1539
 * @retval  0 Success, unique constraint is satisfied.
1540
 * @retval -1 Duplicate is found or read error occurred.
1541
 */
1542
static inline int
1543
vy_check_is_unique_primary(struct vy_tx *tx, const struct vy_read_view **rv,
769,920✔
1544
                           struct vy_lsm *lsm, struct tuple *stmt)
1545
{
1546
        assert(lsm->index_id == 0);
769,920!
1547
        assert(lsm->opts.is_unique);
769,920!
1548
        assert(vy_stmt_type(stmt) == IPROTO_INSERT);
769,920!
1549

1550
        struct tuple *found;
1551
        if (vy_get(lsm, tx, rv, stmt, &found))
769,920!
1552
                return -1;
769,920✔
1553
        if (found != NULL) {
769,916✔
1554
                struct index *index = &lsm->base;
1,673✔
1555
                struct space *space = space_by_id(index->def->space_id);
1,673!
1556
                diag_set(ClientError, ER_TUPLE_FOUND, index->def->name,
1,673!
1557
                         space_name(space), tuple_str(found), tuple_str(stmt));
1558
                tuple_unref(found);
1,673!
1559
                return -1;
1,673✔
1560
        }
1561
        return 0;
768,243✔
1562
}
1563

1564
static int
1565
vy_check_is_unique_secondary_one(struct vy_tx *tx, const struct vy_read_view **rv,
18,560✔
1566
                                 struct vy_lsm *lsm, struct tuple *stmt,
1567
                                 int multikey_idx)
1568
{
1569
        assert(lsm->index_id > 0);
18,560!
1570
        assert(vy_stmt_type(stmt) == IPROTO_INSERT ||
18,560!
1571
               vy_stmt_type(stmt) == IPROTO_REPLACE);
1572

1573
        if (lsm->key_def->is_nullable &&
18,560✔
1574
            tuple_key_contains_null(stmt, lsm->key_def, multikey_idx))
2,450!
1575
                return 0;
18,560✔
1576
        struct tuple *key = vy_stmt_extract_key(stmt, lsm->key_def,
36,792✔
1577
                                                lsm->env->key_format,
18,396!
1578
                                                multikey_idx);
1579
        if (key == NULL)
18,396!
1580
                return -1;
×
1581
        struct tuple *found;
1582
        int rc = vy_get(lsm, tx, rv, key, &found);
18,396!
1583
        tuple_unref(key);
18,396!
1584
        if (rc != 0)
18,396✔
1585
                return -1;
3✔
1586
        /*
1587
         * The old and new tuples may happen to be the same in
1588
         * terms of the primary key definition. For REPLACE this
1589
         * means that the operation overwrites the old tuple
1590
         * without modifying the secondary key and so there's
1591
         * actually no conflict. For INSERT this can only happen
1592
         * if we optimized out the primary index uniqueness check
1593
         * (see space_needs_check_unique_constraint()), in which
1594
         * case we must fail here.
1595
         */
1596
        if (found != NULL && vy_stmt_type(stmt) == IPROTO_REPLACE &&
18,393!
1597
            vy_stmt_compare(stmt, HINT_NONE, found, HINT_NONE,
7,705✔
1598
                            lsm->pk->key_def) == 0) {
7,705!
1599
                tuple_unref(found);
60!
1600
                return 0;
60✔
1601
        }
1602
        if (found != NULL) {
18,333✔
1603
                struct index *index = &lsm->base;
7,896✔
1604
                struct space *space = space_by_id(index->def->space_id);
7,896!
1605
                diag_set(ClientError, ER_TUPLE_FOUND, index->def->name,
7,896!
1606
                         space_name(space), tuple_str(found), tuple_str(stmt));
1607
                tuple_unref(found);
7,896!
1608
                return -1;
7,896✔
1609
        }
1610
        return 0;
10,437✔
1611
}
1612

1613
/**
1614
 * Check if insertion of a new tuple violates unique constraint
1615
 * of a secondary index.
1616
 * @param tx         Current transaction.
1617
 * @param rv         Read view.
1618
 * @param lsm        LSM tree corresponding to the index.
1619
 * @param stmt       New tuple.
1620
 *
1621
 * @retval  0 Success, unique constraint is satisfied.
1622
 * @retval -1 Duplicate is found or read error occurred.
1623
 */
1624
static int
1625
vy_check_is_unique_secondary(struct vy_tx *tx, const struct vy_read_view **rv,
18,499✔
1626
                             struct vy_lsm *lsm, struct tuple *stmt)
1627
{
1628
        assert(lsm->opts.is_unique);
18,499!
1629
        if (!lsm->cmp_def->is_multikey) {
18,499✔
1630
                return vy_check_is_unique_secondary_one(tx, rv, lsm, stmt,
18,439✔
1631
                                                        MULTIKEY_NONE);
1632
        }
1633
        int count = tuple_multikey_count(stmt, lsm->cmp_def);
60✔
1634
        for (int i = 0; i < count; ++i) {
174✔
1635
                if (vy_check_is_unique_secondary_one(tx, rv, lsm, stmt, i) != 0)
121✔
1636
                        return -1;
7✔
1637
        }
1638
        return 0;
53✔
1639
}
1640

1641
/**
1642
 * Check if insertion of a new tuple violates unique constraint
1643
 * of any index of the space.
1644
 * @param env          Vinyl environment.
1645
 * @param tx           Current transaction.
1646
 * @param space        Space to check.
1647
 * @param stmt         New tuple.
1648
 * @param column_mask  Mask of columns changed by the operation.
1649
 *                     Used to optimize out uniqueness check in
1650
 *                     secondary indexes when an inserted tuple
1651
 *                     is a result of an UPDATE operation.
1652
 *
1653
 * @retval  0 Success, unique constraint is satisfied.
1654
 * @retval -1 Duplicate is found or read error occurred.
1655
 */
1656
static int
1657
vy_check_is_unique(struct vy_env *env, struct vy_tx *tx,
1,094,390✔
1658
                   struct space *space, struct tuple *stmt,
1659
                   uint64_t column_mask)
1660
{
1661
        assert(space->index_count > 0);
1,094,390!
1662
        assert(vy_stmt_type(stmt) == IPROTO_INSERT ||
1,094,390!
1663
               vy_stmt_type(stmt) == IPROTO_REPLACE);
1664
        /*
1665
         * During recovery we apply rows that were successfully
1666
         * applied before restart so no conflict is possible.
1667
         */
1668
        if (env->status != VINYL_ONLINE)
1,094,390✔
1669
                return 0;
55,696✔
1670

1671
        const struct vy_read_view **rv = vy_tx_read_view(tx);
1,038,690✔
1672

1673
        /*
1674
         * We only need to check the uniqueness of the primary index
1675
         * if this is INSERT, because REPLACE will silently overwrite
1676
         * the existing tuple, if any.
1677
         */
1678
        if (space_needs_check_unique_constraint(space, 0) &&
1,038,690✔
1679
            vy_stmt_type(stmt) == IPROTO_INSERT) {
1,038,690✔
1680
                struct vy_lsm *lsm = vy_lsm(space->index[0]);
769,920✔
1681
                if (vy_check_is_unique_primary(tx, rv, lsm, stmt) != 0)
769,920✔
1682
                        return -1;
1,677✔
1683
        }
1684

1685
        /*
1686
         * For secondary indexes, uniqueness must be checked on both
1687
         * INSERT and REPLACE.
1688
         */
1689
        for (uint32_t i = 1; i < space->index_count; i++) {
1,545,050✔
1690
                struct vy_lsm *lsm = vy_lsm(space->index[i]);
515,918✔
1691
                if (!space_needs_check_unique_constraint(space, lsm->index_id))
515,918✔
1692
                        continue;
500,568✔
1693
                if (key_update_can_be_skipped(lsm->key_def->column_mask,
15,350✔
1694
                                              column_mask))
1695
                        continue;
203✔
1696
                if (vy_check_is_unique_secondary(tx, rv, lsm, stmt) != 0)
15,147✔
1697
                        return -1;
7,887✔
1698
        }
1699
        return 0;
1,029,130✔
1700
}
1701

1702
/**
1703
 * Check that the key can be used for search in a unique index
1704
 * LSM tree.
1705
 * @param  lsm        LSM tree for checking.
1706
 * @param  key        MessagePack'ed data, the array without a
1707
 *                    header.
1708
 * @param  part_count Part count of the key.
1709
 *
1710
 * @retval  0 The key is valid.
1711
 * @retval -1 The key is not valid, the appropriate error is set
1712
 *            in the diagnostics area.
1713
 */
1714
static inline int
1715
vy_unique_key_validate(struct vy_lsm *lsm, const char *key,
215,757✔
1716
                       uint32_t part_count)
1717
{
1718
        assert(lsm->opts.is_unique);
215,757!
1719
        assert(key != NULL || part_count == 0);
215,757!
1720
        /*
1721
         * The LSM tree contains tuples with concatenation of
1722
         * secondary and primary key fields, while the key
1723
         * supplied by the user only contains the secondary key
1724
         * fields. Use the correct key def to validate the key.
1725
         * The key can be used to look up in the LSM tree since
1726
         * the supplied key parts uniquely identify the tuple,
1727
         * as long as the index is unique.
1728
         */
1729
        uint32_t original_part_count = lsm->key_def->part_count;
215,757✔
1730
        if (original_part_count != part_count) {
215,757✔
1731
                diag_set(ClientError, ER_EXACT_MATCH,
4!
1732
                         original_part_count, part_count);
1733
                return -1;
215,757✔
1734
        }
1735
        const char *key_end;
1736
        return key_validate_parts(lsm->cmp_def, key, part_count, false,
215,753!
1737
                                  &key_end);
1738
}
1739

1740
/**
1741
 * Returns true if the deferred DELETE optimization should be enabled for the
1742
 * given space. It is regulated by a per-space knob, but we also disable it if
1743
 * the space has UPSERT statements, because the deferred DELETE optimization
1744
 * doesn't handle them properly, see vy_write_iterator_deferred_delete().
1745
 */
1746
static inline bool
1747
vy_defer_deletes(struct space *space, struct vy_lsm *pk)
211,880✔
1748
{
1749
        return space->def->opts.defer_deletes &&
243,609✔
1750
                pk->stat.disk.stmt.upserts == 0;
31,729✔
1751
}
1752

1753
/**
1754
 * Execute DELETE in a vinyl space.
1755
 * @param env     Vinyl environment.
1756
 * @param tx      Current transaction.
1757
 * @param stmt    Statement for triggers filled with deleted
1758
 *                statement.
1759
 * @param space   Vinyl space.
1760
 * @param request Request with the tuple data.
1761
 *
1762
 * @retval  0 Success
1763
 * @retval -1 Memory error OR the index is not found OR a tuple
1764
 *            reference increment error.
1765
 */
1766
static int
1767
vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
213,825✔
1768
          struct space *space, struct request *request)
1769
{
1770
        struct vy_lsm *pk = vy_lsm_find(space, 0);
213,825!
1771
        if (pk == NULL)
213,825!
1772
                return -1;
213,825✔
1773
        if (vy_is_committed(env, pk))
213,825!
1774
                return 0;
16,330✔
1775
        struct vy_lsm *lsm = vy_lsm_find_unique(space, request->index_id);
197,495!
1776
        if (lsm == NULL)
197,495!
1777
                return -1;
×
1778
        const char *key = request->key;
197,495!
1779
        uint32_t part_count = mp_decode_array(&key);
197,495✔
1780
        if (vy_unique_key_validate(lsm, key, part_count))
197,495!
1781
                return -1;
6✔
1782
        /*
1783
         * There are four cases when we need to get the full tuple
1784
         * before deletion.
1785
         * - if the space has on_replace triggers and need to pass
1786
         *   to them the old tuple.
1787
         * - if deletion is done by a secondary index.
1788
         * - if the space has a secondary index and deferred DELETES are
1789
         *   disabled.
1790
         * - CDC is enabled.
1791
         */
1792
        if ((space->index_count > 1 && !vy_defer_deletes(space, pk)) ||
197,489!
1793
            lsm->index_id > 0 || !rlist_empty(&space->on_replace) ||
329,010!
1794
            space->wal_ext != NULL) {
164,456!
1795
                if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx),
33,033!
1796
                                      key, part_count, &stmt->old_tuple) != 0)
1797
                        return -1;
1✔
1798
                if (stmt->old_tuple == NULL)
33,032✔
1799
                        return 0;
29,226✔
1800
        }
1801
        int rc = 0;
168,262✔
1802
        struct tuple *delete;
1803
        if (stmt->old_tuple != NULL) {
168,262✔
1804
                delete = vy_stmt_new_surrogate_delete(pk->mem_format,
3,806!
1805
                                                      stmt->old_tuple);
1806
                if (delete == NULL)
3,806!
1807
                        return -1;
×
1808
                for (uint32_t i = 0; i < space->index_count; i++) {
17,500✔
1809
                        struct vy_lsm *lsm = vy_lsm(space->index[i]);
13,694!
1810
                        if (vy_is_committed(env, lsm))
13,694!
1811
                                continue;
×
1812
                        rc = vy_tx_set(tx, lsm, delete);
13,694!
1813
                        if (rc != 0)
13,694!
1814
                                break;
×
1815
                }
1816
        } else {
1817
                assert(lsm->index_id == 0);
164,456!
1818
                delete = vy_stmt_new_delete(pk->env->key_format,
164,456!
1819
                                            request->key, request->key_end);
1820
                if (delete == NULL)
164,456!
1821
                        return -1;
×
1822
                if (space->index_count > 1)
164,456✔
1823
                        vy_stmt_set_flags(delete, VY_STMT_DEFERRED_DELETE);
15,551!
1824
                rc = vy_tx_set(tx, pk, delete);
164,456!
1825
        }
1826
        tuple_unref(delete);
168,262!
1827
        return rc;
168,262✔
1828
}
1829

1830
/**
1831
 * We do not allow changes of the primary key during update.
1832
 *
1833
 * The syntax of update operation allows the user to update the
1834
 * primary key of a tuple, which is prohibited, to avoid funny
1835
 * effects during replication.
1836
 *
1837
 * @param pk         Primary index LSM tree.
1838
 * @param old_tuple  The tuple before update.
1839
 * @param new_tuple  The tuple after update.
1840
 * @param column_mask Bitmask of the update operation.
1841
 *
1842
 * @retval  0 Success, the primary key is not modified in the new
1843
 *            tuple.
1844
 * @retval -1 Attempt to modify the primary key.
1845
 */
1846
static inline int
1847
vy_check_update(struct space *space, const struct vy_lsm *pk,
16,489✔
1848
                struct tuple *old_tuple, struct tuple *new_tuple,
1849
                uint64_t column_mask)
1850
{
1851
        if (!key_update_can_be_skipped(pk->key_def->column_mask, column_mask) &&
16,489✔
1852
            vy_stmt_compare(old_tuple, HINT_NONE, new_tuple,
160✔
1853
                            HINT_NONE, pk->key_def) != 0) {
1854
                diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY,
13!
1855
                         space_name(space));
1856
                return -1;
13✔
1857
        }
1858
        return 0;
16,476✔
1859
}
1860

1861
/**
1862
 * An UPDATE operation turns into a REPLACE statement in the
1863
 * primary index and into DELETE + INSERT in secondary indexes.
1864
 * This function performs an UPDATE operation in the given
1865
 * transaction write set after stmt->old_tuple and new_tuple
1866
 * have been initialized and checked.
1867
 */
1868
static int
1869
vy_perform_update(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
16,476✔
1870
                  struct space *space, struct vy_lsm *pk, int64_t column_mask)
1871
{
1872
        assert(stmt->old_tuple != NULL);
16,476!
1873
        assert(stmt->new_tuple != NULL);
16,476!
1874

1875
        if (vy_check_is_unique(env, tx, space, stmt->new_tuple,
16,476✔
1876
                               column_mask) != 0)
1877
                return -1;
8✔
1878

1879
        vy_stmt_set_flags(stmt->new_tuple, VY_STMT_UPDATE);
16,468✔
1880

1881
        if (vy_tx_set(tx, pk, stmt->new_tuple) != 0)
16,468!
1882
                return -1;
×
1883
        if (space->index_count == 1)
16,468✔
1884
                return 0;
11,771✔
1885

1886
        struct tuple *delete = vy_stmt_new_surrogate_delete(pk->mem_format,
4,697✔
1887
                                                            stmt->old_tuple);
1888
        if (delete == NULL)
4,697!
1889
                return -1;
×
1890

1891
        for (uint32_t i = 1; i < space->index_count; ++i) {
18,729✔
1892
                struct vy_lsm *lsm = vy_lsm(space->index[i]);
14,032✔
1893
                if (vy_is_committed(env, lsm))
14,032!
1894
                        continue;
×
1895
                if (vy_tx_set(tx, lsm, delete) != 0)
14,032!
1896
                        goto error;
×
1897
                if (vy_tx_set(tx, lsm, stmt->new_tuple) != 0)
14,032!
1898
                        goto error;
×
1899
        }
1900
        tuple_unref(delete);
4,697✔
1901
        return 0;
4,697✔
1902
error:
×
1903
        tuple_unref(delete);
×
1904
        return -1;
×
1905
}
1906

1907
/**
1908
 * Execute UPDATE in a vinyl space.
1909
 * @param env     Vinyl environment.
1910
 * @param tx      Current transaction.
1911
 * @param stmt    Statement for triggers filled with old and new
1912
 *                statements.
1913
 * @param space   Vinyl space.
1914
 * @param request Request with the tuple data.
1915
 *
1916
 * @retval  0 Success
1917
 * @retval -1 Memory error OR the index is not found OR a tuple
1918
 *            reference increment error.
1919
 */
1920
static int
1921
vy_update(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
19,187✔
1922
          struct space *space, struct request *request)
1923
{
1924
        assert(tx != NULL && tx->state == VINYL_TX_READY);
19,187!
1925
        struct vy_lsm *pk = vy_lsm_find(space, 0);
19,187!
1926
        if (pk == NULL)
19,187!
1927
                return -1;
19,187✔
1928
        if (vy_is_committed(env, pk))
19,187!
1929
                return 0;
923✔
1930
        struct vy_lsm *lsm = vy_lsm_find_unique(space, request->index_id);
18,264!
1931
        if (lsm == NULL)
18,264✔
1932
                return -1;
2✔
1933
        const char *key = request->key;
18,262!
1934
        uint32_t part_count = mp_decode_array(&key);
18,262✔
1935
        if (vy_unique_key_validate(lsm, key, part_count))
18,262!
1936
                return -1;
5✔
1937

1938
        if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx),
18,257!
1939
                              key, part_count, &stmt->old_tuple) != 0)
1940
                return -1;
1✔
1941
        /* Nothing to update. */
1942
        if (stmt->old_tuple == NULL)
18,256✔
1943
                return 0;
4,639✔
1944

1945
        /* Apply update operations. */
1946
        uint64_t column_mask = 0;
13,617✔
1947
        const char *new_tuple, *new_tuple_end;
1948
        uint32_t new_size, old_size;
1949
        const char *old_tuple = tuple_data_range(stmt->old_tuple, &old_size);
13,617!
1950
        const char *old_tuple_end = old_tuple + old_size;
13,617✔
1951
        size_t region_svp = region_used(&fiber()->gc);
13,617!
1952
        new_tuple = xrow_update_execute(request->tuple, request->tuple_end,
13,617!
1953
                                        old_tuple, old_tuple_end,
1954
                                        pk->mem_format, &new_size,
1955
                                        request->index_base, &column_mask);
1956
        if (new_tuple == NULL)
13,617✔
1957
                return -1;
380✔
1958
        new_tuple_end = new_tuple + new_size;
13,237✔
1959
        /*
1960
         * Check that the new tuple matches the space format and
1961
         * the primary key was not modified.
1962
         */
1963
        if (tuple_validate_raw(pk->mem_format, new_tuple)) {
13,237!
1964
                region_truncate(&fiber()->gc, region_svp);
16!
1965
                return -1;
16✔
1966
        }
1967
        stmt->new_tuple = vy_stmt_new_replace(pk->mem_format, new_tuple,
13,221!
1968
                                              new_tuple_end);
1969
        region_truncate(&fiber()->gc, region_svp);
13,221!
1970
        if (stmt->new_tuple == NULL)
13,221!
1971
                return -1;
×
1972
        if (vy_check_update(space, pk, stmt->old_tuple, stmt->new_tuple,
13,221!
1973
                            column_mask) != 0)
1974
                return -1;
7✔
1975

1976
        return vy_perform_update(env, tx, stmt, space, pk, column_mask);
13,214!
1977
}
1978

1979
/**
1980
 * Insert the tuple in the space without checking duplicates in
1981
 * the primary index.
1982
 * @param env       Vinyl environment.
1983
 * @param tx        Current transaction.
1984
 * @param space     Space in which insert.
1985
 * @param stmt      Tuple to upsert.
1986
 *
1987
 * @retval  0 Success.
1988
 * @retval -1 Memory error or a secondary index duplicate error.
1989
 */
1990
static int
1991
vy_insert_first_upsert(struct vy_env *env, struct vy_tx *tx,
32,746✔
1992
                       struct space *space, struct tuple *stmt)
1993
{
1994
        assert(tx != NULL && tx->state == VINYL_TX_READY);
32,746!
1995
        assert(space->index_count > 0);
32,746!
1996
        assert(vy_stmt_type(stmt) == IPROTO_INSERT);
32,746!
1997
        if (vy_check_is_unique(env, tx, space, stmt, COLUMN_MASK_FULL) != 0)
32,746✔
1998
                return -1;
2✔
1999
        struct vy_lsm *pk = vy_lsm(space->index[0]);
32,744✔
2000
        assert(pk->index_id == 0);
32,744!
2001
        if (vy_tx_set(tx, pk, stmt) != 0)
32,744✔
2002
                return -1;
1✔
2003
        for (uint32_t i = 1; i < space->index_count; ++i) {
163,584✔
2004
                struct vy_lsm *lsm = vy_lsm(space->index[i]);
130,841✔
2005
                if (vy_tx_set(tx, lsm, stmt) != 0)
130,841!
2006
                        return -1;
×
2007
        }
2008
        return 0;
32,743✔
2009
}
2010

2011
/**
2012
 * Insert UPSERT into the write set of the transaction.
2013
 * @param tx        Transaction which deletes.
2014
 * @param lsm       LSM tree in which \p tx deletes.
2015
 * @param tuple     MessagePack array.
2016
 * @param tuple_end End of the tuple.
2017
 * @param expr      MessagePack array of update operations.
2018
 * @param expr_end  End of the \p expr.
2019
 *
2020
 * @retval  0 Success.
2021
 * @retval -1 Memory error.
2022
 */
2023
static int
2024
vy_lsm_upsert(struct vy_tx *tx, struct vy_lsm *lsm,
45,037✔
2025
          const char *tuple, const char *tuple_end,
2026
          const char *expr, const char *expr_end)
2027
{
2028
        assert(tx == NULL || tx->state == VINYL_TX_READY);
45,037!
2029
        struct tuple *vystmt;
2030
        struct iovec operations[2];
2031
        /* MP_ARRAY with size 1. */
2032
        char header = 0x91;
45,037✔
2033
        operations[0].iov_base = &header;
45,037✔
2034
        operations[0].iov_len = 1;
45,037✔
2035
        operations[1].iov_base = (void *)expr;
45,037✔
2036
        operations[1].iov_len = expr_end - expr;
45,037✔
2037
        vystmt = vy_stmt_new_upsert(lsm->mem_format, tuple, tuple_end,
45,037!
2038
                                    operations, 2);
2039
        if (vystmt == NULL)
45,037!
2040
                return -1;
45,037✔
2041
        assert(vy_stmt_type(vystmt) == IPROTO_UPSERT);
45,037!
2042
        int rc = vy_tx_set(tx, lsm, vystmt);
45,037!
2043
        tuple_unref(vystmt);
45,037!
2044
        return rc;
45,037✔
2045
}
2046

2047
static int
2048
request_normalize_ops(struct request *request)
80,510✔
2049
{
2050
        assert(request->type == IPROTO_UPSERT ||
80,510!
2051
               request->type == IPROTO_UPDATE);
2052
        assert(request->index_base != 0);
80,510!
2053
        char *ops;
2054
        ssize_t ops_len = request->ops_end - request->ops;
80,510✔
2055
        ops = (char *)region_alloc(&fiber()->gc, ops_len);
80,510!
2056
        if (ops == NULL)
80,510!
2057
                return -1;
80,510✔
2058
        char *ops_end = ops;
80,510✔
2059
        const char *pos = request->ops;
80,510!
2060
        int op_cnt = mp_decode_array(&pos);
80,510✔
2061
        ops_end = mp_encode_array(ops_end, op_cnt);
80,510!
2062
        int op_no = 0;
80,510✔
2063
        for (op_no = 0; op_no < op_cnt; ++op_no) {
268,440✔
2064
                int op_len = mp_decode_array(&pos);
187,930✔
2065
                ops_end = mp_encode_array(ops_end, op_len);
187,930!
2066

2067
                uint32_t op_name_len;
2068
                const char  *op_name = mp_decode_str(&pos, &op_name_len);
187,930!
2069
                ops_end = mp_encode_str(ops_end, op_name, op_name_len);
187,930!
2070

2071
                int field_no;
2072
                const char *field_name;
2073
                switch (mp_typeof(*pos)) {
375,860!
2074
                case MP_INT:
56✔
2075
                        field_no = mp_decode_int(&pos);
56!
2076
                        ops_end = mp_encode_int(ops_end, field_no);
56!
2077
                        break;
56✔
2078
                case MP_UINT:
187,872✔
2079
                        field_no = mp_decode_uint(&pos);
187,872!
2080
                        field_no -= request->index_base;
187,872✔
2081
                        ops_end = mp_encode_uint(ops_end, field_no);
187,872!
2082
                        break;
187,872✔
2083
                case MP_STR:
2✔
2084
                        field_name = pos;
2✔
2085
                        mp_next(&pos);
2!
2086
                        memcpy(ops_end, field_name, pos - field_name);
2✔
2087
                        ops_end += pos - field_name;
2✔
2088
                        break;
2✔
2089
                default:
×
2090
                        unreachable();
×
2091
                }
2092

2093
                if (*op_name == ':') {
187,930✔
2094
                        /**
2095
                         * splice op adjust string pos and copy
2096
                         * 2 additional arguments
2097
                         */
2098
                        int str_pos;
2099
                        if (mp_typeof(*pos) == MP_INT) {
16✔
2100
                                str_pos = mp_decode_int(&pos);
1!
2101
                                ops_end = mp_encode_int(ops_end, str_pos);
1!
2102
                        } else {
2103
                                str_pos = mp_decode_uint(&pos);
7!
2104
                                str_pos -= request->index_base;
7✔
2105
                                ops_end = mp_encode_uint(ops_end, str_pos);
7!
2106
                        }
2107
                        const char *arg = pos;
8✔
2108
                        mp_next(&pos);
8!
2109
                        memcpy(ops_end, arg, pos - arg);
8✔
2110
                        ops_end += pos - arg;
8✔
2111
                }
2112
                const char *arg = pos;
187,930✔
2113
                mp_next(&pos);
187,930!
2114
                memcpy(ops_end, arg, pos - arg);
187,930✔
2115
                ops_end += pos - arg;
187,930✔
2116
        }
2117
        request->ops = (const char *)ops;
80,510✔
2118
        request->ops_end = (const char *)ops_end;
80,510✔
2119
        request->index_base = 0;
80,510✔
2120

2121
        /* Clear the header to ensure it's rebuilt at commit. */
2122
        request->header = NULL;
80,510✔
2123
        return 0;
80,510✔
2124
}
2125

2126
/**
2127
 * Execute UPSERT in a vinyl space.
2128
 * @param env     Vinyl environment.
2129
 * @param tx      Current transaction.
2130
 * @param stmt    Statement for triggers filled with old and new
2131
 *                statements.
2132
 * @param space   Vinyl space.
2133
 * @param request Request with the tuple data and update
2134
 *                operations.
2135
 *
2136
 * @retval  0 Success
2137
 * @retval -1 Memory error OR the index is not found OR a tuple
2138
 *            reference increment error.
2139
 */
2140
static int
2141
vy_upsert(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
97,813✔
2142
          struct space *space, struct request *request)
2143
{
2144
        assert(tx != NULL && tx->state == VINYL_TX_READY);
97,813!
2145
        struct vy_lsm *pk = vy_lsm_find(space, 0);
97,813!
2146
        if (pk == NULL)
97,813✔
2147
                return -1;
97,813✔
2148
        if (vy_is_committed(env, pk))
97,812!
2149
                return 0;
16,730✔
2150
        /* Check update operations. */
2151
        if (xrow_update_check_ops(request->ops, request->ops_end,
81,082!
2152
                                  pk->mem_format, request->index_base) != 0) {
2153
                return -1;
11✔
2154
        }
2155
        if (request->index_base != 0) {
81,071✔
2156
                if (request_normalize_ops(request))
80,510!
2157
                        return -1;
×
2158
        }
2159
        assert(request->index_base == 0);
81,071!
2160
        const char *tuple = request->tuple;
81,071✔
2161
        const char *tuple_end = request->tuple_end;
81,071✔
2162
        const char *ops = request->ops;
81,071✔
2163
        const char *ops_end = request->ops_end;
81,071✔
2164
        if (tuple_validate_raw(pk->mem_format, tuple))
81,071!
2165
                return -1;
9✔
2166

2167
        if (space->index_count == 1 && rlist_empty(&space->on_replace) &&
126,172!
2168
            !space->has_foreign_keys && space->wal_ext == NULL)
45,043!
2169
                return vy_lsm_upsert(tx, pk, tuple, tuple_end, ops, ops_end);
45,037!
2170

2171
        const char *old_tuple, *old_tuple_end;
2172
        const char *new_tuple, *new_tuple_end;
2173
        uint32_t new_size;
2174
        uint64_t column_mask;
2175
        /*
2176
         * There are two cases when need to get the old tuple
2177
         * before upsert:
2178
         * - if the space has one or more on_repace triggers;
2179
         *
2180
         * - if the space has one or more secondary indexes: then
2181
         *   we need to extract secondary keys from the old tuple
2182
         *   to delete old tuples from secondary indexes.
2183
         */
2184
        /* Find the old tuple using the primary key. */
2185
        struct tuple *key = vy_stmt_extract_key_raw(tuple, tuple_end,
72,050✔
2186
                                        pk->key_def, pk->env->key_format,
36,025!
2187
                                        MULTIKEY_NONE);
2188
        if (key == NULL)
36,025!
2189
                return -1;
×
2190
        int rc = vy_get(pk, tx, vy_tx_read_view(tx), key, &stmt->old_tuple);
36,025!
2191
        tuple_unref(key);
36,025!
2192
        if (rc != 0)
36,025✔
2193
                return -1;
4✔
2194
        /*
2195
         * If the old tuple was not found then UPSERT
2196
         * turns into INSERT.
2197
         */
2198
        if (stmt->old_tuple == NULL) {
36,021✔
2199
                stmt->new_tuple = vy_stmt_new_insert(pk->mem_format,
32,746!
2200
                                                     tuple, tuple_end);
2201
                if (stmt->new_tuple == NULL)
32,746!
2202
                        return -1;
×
2203
                return vy_insert_first_upsert(env, tx, space, stmt->new_tuple);
32,746!
2204
        }
2205
        uint32_t old_size;
2206
        old_tuple = tuple_data_range(stmt->old_tuple, &old_size);
3,275!
2207
        old_tuple_end = old_tuple + old_size;
3,275✔
2208

2209
        size_t region_svp = region_used(&fiber()->gc);
3,275!
2210
        /* Apply upsert operations to the old tuple. */
2211
        new_tuple = xrow_upsert_execute(ops, ops_end, old_tuple, old_tuple_end,
3,275!
2212
                                        pk->mem_format, &new_size, 0, false,
2213
                                        &column_mask);
2214
        if (new_tuple == NULL)
3,275!
2215
                return -1;
×
2216
        /*
2217
         * Check that the new tuple matched the space
2218
         * format and the primary key was not modified.
2219
         */
2220
        if (tuple_validate_raw(pk->mem_format, new_tuple)) {
3,275!
2221
                region_truncate(&fiber()->gc, region_svp);
7!
2222
                return -1;
7✔
2223
        }
2224
        new_tuple_end = new_tuple + new_size;
3,268✔
2225
        stmt->new_tuple = vy_stmt_new_replace(pk->mem_format, new_tuple,
3,268!
2226
                                              new_tuple_end);
2227
        region_truncate(&fiber()->gc, region_svp);
3,268!
2228
        if (stmt->new_tuple == NULL)
3,268!
2229
                return -1;
×
2230
        if (vy_check_update(space, pk, stmt->old_tuple, stmt->new_tuple,
3,268!
2231
                            column_mask) != 0) {
2232
                diag_log();
6!
2233
                /*
2234
                 * Upsert is skipped, to match the semantics of
2235
                 * vy_lsm_upsert(). Clear the statement so that
2236
                 * vy_build_on_replace() doesn't try to apply it.
2237
                 */
2238
                tuple_unref(stmt->old_tuple);
6!
2239
                tuple_unref(stmt->new_tuple);
6!
2240
                stmt->old_tuple = NULL;
6✔
2241
                stmt->new_tuple = NULL;
6✔
2242
                return 0;
6✔
2243
        }
2244
        return vy_perform_update(env, tx, stmt, space, pk, column_mask);
3,262!
2245
}
2246

2247
/**
2248
 * Execute INSERT in a vinyl space.
2249
 * @param env     Vinyl environment.
2250
 * @param tx      Current transaction.
2251
 * @param stmt    Statement for triggers filled with the new
2252
 *                statement.
2253
 * @param space   Vinyl space.
2254
 * @param request Request with the tuple data and update
2255
 *                operations.
2256
 *
2257
 * @retval  0 Success
2258
 * @retval -1 Memory error OR duplicate error OR the primary
2259
 *            index is not found
2260
 */
2261
static int
2262
vy_insert(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
795,385✔
2263
          struct space *space, struct request *request)
2264
{
2265
        assert(tx != NULL && tx->state == VINYL_TX_READY);
795,385!
2266
        struct vy_lsm *pk = vy_lsm_find(space, 0);
795,385✔
2267
        if (pk == NULL)
795,385✔
2268
                return -1;
3✔
2269
        if (vy_is_committed(env, pk))
795,382✔
2270
                return 0;
8,220✔
2271
        if (tuple_validate_raw(pk->mem_format, request->tuple))
787,162✔
2272
                return -1;
123✔
2273
        /* First insert into the primary index. */
2274
        stmt->new_tuple = vy_stmt_new_insert(pk->mem_format, request->tuple,
787,039✔
2275
                                             request->tuple_end);
2276
        if (stmt->new_tuple == NULL)
787,039!
2277
                return -1;
×
2278
        if (vy_check_is_unique(env, tx, space, stmt->new_tuple,
787,039✔
2279
                               COLUMN_MASK_FULL) != 0)
2280
                return -1;
1,927✔
2281
        if (vy_tx_set(tx, pk, stmt->new_tuple) != 0)
785,112✔
2282
                return -1;
1✔
2283

2284
        for (uint32_t iid = 1; iid < space->index_count; ++iid) {
858,831✔
2285
                struct vy_lsm *lsm = vy_lsm(space->index[iid]);
73,720✔
2286
                if (vy_is_committed(env, lsm))
73,720!
2287
                        continue;
×
2288
                if (vy_tx_set(tx, lsm, stmt->new_tuple) != 0)
73,720!
2289
                        return -1;
×
2290
        }
2291
        return 0;
785,111✔
2292
}
2293

2294
/**
2295
 * Execute REPLACE in a vinyl space.
2296
 * @param env     Vinyl environment.
2297
 * @param tx      Current transaction.
2298
 * @param stmt    Statement for triggers filled with old
2299
 *                statement.
2300
 * @param space   Vinyl space.
2301
 * @param request Request with the tuple data.
2302
 *
2303
 * @retval  0 Success
2304
 * @retval -1 Memory error OR duplicate key error OR the primary
2305
 *            index is not found OR a tuple reference increment
2306
 *            error.
2307
 */
2308
static int
2309
vy_replace(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
274,949✔
2310
           struct space *space, struct request *request)
2311
{
2312
        assert(tx != NULL && tx->state == VINYL_TX_READY);
274,949!
2313
        struct vy_lsm *pk = vy_lsm_find(space, 0);
274,949✔
2314
        if (pk == NULL)
274,949!
2315
                return -1;
×
2316
        if (vy_is_committed(env, pk))
274,949✔
2317
                return 0;
16,656✔
2318

2319
        /* Validate and create a statement for the new tuple. */
2320
        if (tuple_validate_raw(pk->mem_format, request->tuple))
258,293✔
2321
                return -1;
164✔
2322
        stmt->new_tuple = vy_stmt_new_replace(pk->mem_format, request->tuple,
258,129✔
2323
                                              request->tuple_end);
2324
        if (stmt->new_tuple == NULL)
258,129✔
2325
                return -1;
2✔
2326
        if (vy_check_is_unique(env, tx, space, stmt->new_tuple,
258,127✔
2327
                               COLUMN_MASK_FULL) != 0)
2328
                return -1;
7,627✔
2329
        /*
2330
         * There are three cases when we need to get the full tuple on replace.
2331
         * - if the space has on_replace triggers and need to pass
2332
         *   to them the old tuple.
2333
         * - if the space has a secondary index and deferred DELETES are
2334
         *   disabled.
2335
         * - if the space has WAL extensions.
2336
         */
2337
        if ((space->index_count > 1 && !vy_defer_deletes(space, pk)) ||
250,500✔
2338
            !rlist_empty(&space->on_replace) || space->wal_ext != NULL) {
206,664!
2339
                if (vy_get(pk, tx, vy_tx_read_view(tx),
147,759✔
2340
                           stmt->new_tuple, &stmt->old_tuple) != 0)
2341
                        return -1;
2✔
2342
                if (stmt->old_tuple == NULL) {
147,757✔
2343
                        /*
2344
                         * We can turn REPLACE into INSERT if the
2345
                         * new key does not have history.
2346
                         */
2347
                        vy_stmt_set_type(stmt->new_tuple, IPROTO_INSERT);
137,954✔
2348
                }
2349
        } else if (space->index_count > 1) {
102,741✔
2350
                vy_stmt_set_flags(stmt->new_tuple, VY_STMT_DEFERRED_DELETE);
16,177✔
2351
        }
2352
        /*
2353
         * Replace in the primary index without explicit deletion
2354
         * of the old tuple.
2355
         */
2356
        if (vy_tx_set(tx, pk, stmt->new_tuple) != 0)
250,498!
2357
                return -1;
×
2358
        if (space->index_count == 1)
250,498✔
2359
                return 0;
87,155✔
2360
        /*
2361
         * Replace in secondary indexes with explicit deletion
2362
         * of the old tuple, if any.
2363
         */
2364
        int rc = 0;
163,343✔
2365
        struct tuple *delete = NULL;
163,343✔
2366
        if (stmt->old_tuple != NULL) {
163,343✔
2367
                delete = vy_stmt_new_surrogate_delete(pk->mem_format,
9,536✔
2368
                                                      stmt->old_tuple);
2369
                if (delete == NULL)
9,536!
2370
                        return -1;
×
2371
        }
2372
        for (uint32_t i = 1; i < space->index_count; i++) {
454,188✔
2373
                struct vy_lsm *lsm = vy_lsm(space->index[i]);
290,845✔
2374
                if (vy_is_committed(env, lsm))
290,845!
2375
                        continue;
×
2376
                if (delete != NULL) {
290,845✔
2377
                        rc = vy_tx_set(tx, lsm, delete);
15,038✔
2378
                        if (rc != 0)
15,038!
2379
                                break;
×
2380
                }
2381
                rc = vy_tx_set(tx, lsm, stmt->new_tuple);
290,845✔
2382
                if (rc != 0)
290,845!
2383
                        break;
×
2384
        }
2385
        if (delete != NULL)
163,343✔
2386
                tuple_unref(delete);
9,536✔
2387
        return rc;
163,343✔
2388
}
2389

2390
static int
2391
vinyl_space_execute_replace(struct space *space, struct txn *txn,
1,070,330✔
2392
                            struct request *request, struct tuple **result)
2393
{
2394
        assert(request->index_id == 0);
1,070,330!
2395
        struct vy_env *env = vy_env(space->engine);
1,070,330✔
2396
        struct vy_tx *tx = txn->engine_tx;
1,070,330✔
2397
        struct txn_stmt *stmt = txn_current_stmt(txn);
1,070,330✔
2398
        int rc;
2399
        if (request->type == IPROTO_INSERT)
1,070,330✔
2400
                rc = vy_insert(env, tx, stmt, space, request);
795,385✔
2401
        else
2402
                rc = vy_replace(env, tx, stmt, space, request);
274,949✔
2403
        if (rc != 0)
1,070,330✔
2404
                return -1;
9,849✔
2405
        *result = stmt->new_tuple;
1,060,480✔
2406
        return 0;
1,060,480✔
2407
}
2408

2409
static int
2410
vinyl_space_execute_delete(struct space *space, struct txn *txn,
213,825✔
2411
                           struct request *request, struct tuple **result)
2412
{
2413
        struct vy_env *env = vy_env(space->engine);
213,825✔
2414
        struct vy_tx *tx = txn->engine_tx;
213,825✔
2415
        struct txn_stmt *stmt = txn_current_stmt(txn);
213,825✔
2416
        if (vy_delete(env, tx, stmt, space, request))
213,825✔
2417
                return -1;
7✔
2418
        /*
2419
         * Delete may or may not set stmt->old_tuple,
2420
         * but we always return NULL.
2421
         */
2422
        *result = NULL;
213,818✔
2423
        return 0;
213,818✔
2424
}
2425

2426
static int
2427
vinyl_space_execute_update(struct space *space, struct txn *txn,
19,187✔
2428
                           struct request *request, struct tuple **result)
2429
{
2430
        struct vy_env *env = vy_env(space->engine);
19,187✔
2431
        struct vy_tx *tx = txn->engine_tx;
19,187✔
2432
        struct txn_stmt *stmt = txn_current_stmt(txn);
19,187✔
2433
        if (vy_update(env, tx, stmt, space, request) != 0)
19,187✔
2434
                return -1;
418✔
2435
        *result = stmt->new_tuple;
18,769✔
2436
        return 0;
18,769✔
2437
}
2438

2439
static int
2440
vinyl_space_execute_upsert(struct space *space, struct txn *txn,
97,813✔
2441
                           struct request *request)
2442
{
2443
        struct vy_env *env = vy_env(space->engine);
97,813✔
2444
        struct vy_tx *tx = txn->engine_tx;
97,813✔
2445
        struct txn_stmt *stmt = txn_current_stmt(txn);
97,813✔
2446
        return vy_upsert(env, tx, stmt, space, request);
97,813✔
2447
}
2448

2449
static int
2450
vinyl_engine_begin(struct engine *engine, struct txn *txn)
709,382✔
2451
{
2452
        struct vy_env *env = vy_env(engine);
709,382✔
2453
        assert(txn->engine_tx == NULL);
709,382!
2454
        txn->engine_tx = vy_tx_begin(env->xm, txn->isolation);
709,382✔
2455
        if (txn->engine_tx == NULL)
709,382!
2456
                return -1;
×
2457
        return 0;
709,382✔
2458
}
2459

2460
static int
2461
vinyl_engine_prepare(struct engine *engine, struct txn *txn)
706,545✔
2462
{
2463
        struct vy_env *env = vy_env(engine);
706,545✔
2464
        struct vy_tx *tx = txn->engine_tx;
706,545✔
2465
        assert(tx != NULL);
706,545!
2466

2467
        if (tx->write_size > 0 &&
706,545✔
2468
            vinyl_check_wal(env, "DML") != 0)
651,798✔
2469
                return -1;
1✔
2470

2471
        /*
2472
         * Do not abort join/subscribe on quota timeout - replication
2473
         * is asynchronous anyway and there's box.info.replication
2474
         * available for the admin to track the lag so let the applier
2475
         * wait as long as necessary for memory dump to complete.
2476
         */
2477
        double timeout = (tx->is_applier_session ?
1,413,090✔
2478
                          TIMEOUT_INFINITY : env->timeout);
706,544✔
2479
        /*
2480
         * Reserve quota needed by the transaction before allocating
2481
         * memory. Since this may yield, which opens a time window for
2482
         * the transaction to be sent to read view or aborted, we call
2483
         * it before checking for conflicts.
2484
         */
2485
        if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX,
706,544✔
2486
                         tx->write_size, timeout) != 0)
2487
                return -1;
4✔
2488

2489
        size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
706,539✔
2490

2491
        int rc = vy_tx_prepare(tx);
706,539✔
2492

2493
        size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
706,539✔
2494
        assert(mem_used_after >= mem_used_before);
706,539!
2495
        vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX,
706,539✔
2496
                        tx->write_size, mem_used_after - mem_used_before);
2497
        vy_regulator_check_dump_watermark(&env->regulator);
706,539✔
2498
        return rc;
706,539✔
2499
}
2500

2501
static void
2502
vinyl_engine_commit(struct engine *engine, struct txn *txn)
706,127✔
2503
{
2504
        struct vy_env *env = vy_env(engine);
706,127✔
2505
        struct vy_tx *tx = txn->engine_tx;
706,127✔
2506
        assert(tx != NULL);
706,127!
2507

2508
        /*
2509
         * vy_tx_commit() may trigger an upsert squash.
2510
         * If there is no memory for a created statement,
2511
         * it silently fails. But if it succeeds, we
2512
         * need to account the memory in the quota.
2513
         */
2514
        size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
706,127✔
2515

2516
        vy_tx_commit(tx, txn->signature);
706,127✔
2517

2518
        size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
706,127✔
2519
        assert(mem_used_after >= mem_used_before);
706,127!
2520
        /* We can't abort the transaction at this point, use force. */
2521
        vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
706,127✔
2522
                           mem_used_after - mem_used_before);
2523
        vy_regulator_check_dump_watermark(&env->regulator);
706,127✔
2524

2525
        txn->engine_tx = NULL;
706,127✔
2526
}
706,127✔
2527

2528
static void
2529
vinyl_engine_rollback(struct engine *engine, struct txn *txn)
3,253✔
2530
{
2531
        (void)engine;
2532
        struct vy_tx *tx = txn->engine_tx;
3,253✔
2533
        if (tx == NULL)
3,253!
2534
                return;
×
2535

2536
        vy_tx_rollback(tx);
3,253✔
2537

2538
        txn->engine_tx = NULL;
3,253✔
2539
}
2540

2541
static int
2542
vinyl_engine_begin_statement(struct engine *engine, struct txn *txn)
1,402,100✔
2543
{
2544
        (void)engine;
2545
        struct vy_tx *tx = txn->engine_tx;
1,402,100✔
2546
        struct txn_stmt *stmt = txn_current_stmt(txn);
1,402,100✔
2547
        assert(tx != NULL);
1,402,100!
2548
        return vy_tx_begin_statement(tx, stmt->space, &stmt->engine_savepoint);
1,402,100✔
2549
}
2550

2551
static void
2552
vinyl_engine_rollback_statement(struct engine *engine, struct txn *txn,
17,890✔
2553
                                struct txn_stmt *stmt)
2554
{
2555
        (void)engine;
2556
        struct vy_tx *tx = txn->engine_tx;
17,890✔
2557
        assert(tx != NULL);
17,890!
2558
        vy_tx_rollback_statement(tx, stmt->engine_savepoint);
17,890✔
2559
}
17,890✔
2560

2561
static void
2562
vinyl_engine_switch_to_ro(struct engine *engine)
2,513✔
2563
{
2564
        struct vy_env *env = vy_env(engine);
2,513✔
2565
        vy_tx_manager_abort_writers_for_ro(env->xm);
2,513✔
2566
}
2,513✔
2567

2568
/* }}} Public API of transaction control */
2569

2570
/** {{{ Environment */
2571

2572
static void
2573
vy_env_quota_exceeded_cb(struct vy_quota *quota)
973✔
2574
{
2575
        struct vy_env *env = container_of(quota, struct vy_env, quota);
973✔
2576
        vy_regulator_quota_exceeded(&env->regulator);
973✔
2577
}
973✔
2578

2579
static int
2580
vy_env_trigger_dump_cb(struct vy_regulator *regulator)
189✔
2581
{
2582
        struct vy_env *env = container_of(regulator, struct vy_env, regulator);
189✔
2583

2584
        if (lsregion_used(&env->mem_env.allocator) == 0) {
189✔
2585
                /*
2586
                 * The memory limit has been exceeded, but there's
2587
                 * nothing to dump. This may happen if all available
2588
                 * quota has been consumed by pending transactions.
2589
                 * There's nothing we can do about that.
2590
                 */
2591
                return -1;
1✔
2592
        }
2593
        vy_scheduler_trigger_dump(&env->scheduler);
188✔
2594
        return 0;
188✔
2595
}
2596

2597
static void
2598
vy_env_dump_complete_cb(struct vy_scheduler *scheduler,
1,283✔
2599
                        int64_t dump_generation, double dump_duration)
2600
{
2601
        struct vy_env *env = container_of(scheduler, struct vy_env, scheduler);
1,283✔
2602

2603
        /* Free memory and release quota. */
2604
        struct lsregion *allocator = &env->mem_env.allocator;
1,283✔
2605
        struct vy_quota *quota = &env->quota;
1,283✔
2606
        size_t mem_used_before = lsregion_used(allocator);
1,283✔
2607
        lsregion_gc(allocator, dump_generation);
1,283✔
2608
        size_t mem_used_after = lsregion_used(allocator);
1,283✔
2609
        assert(mem_used_after <= mem_used_before);
1,283!
2610
        size_t mem_dumped = mem_used_before - mem_used_after;
1,283✔
2611
        /*
2612
         * In certain corner cases, vy_quota_release() may need
2613
         * to trigger a new dump. Notify the regulator about dump
2614
         * completion before releasing quota so that it can start
2615
         * a new dump immediately.
2616
         */
2617
        vy_regulator_dump_complete(&env->regulator, mem_dumped, dump_duration);
1,283✔
2618
        vy_quota_release(quota, mem_dumped);
1,283✔
2619

2620
        vy_regulator_update_rate_limit(&env->regulator, &scheduler->stat,
1,283✔
2621
                                       scheduler->compaction_pool.size);
2622
}
1,283✔
2623

2624
static struct vy_squash_queue *
2625
vy_squash_queue_new(void);
2626
static void
2627
vy_squash_queue_delete(struct vy_squash_queue *q);
2628
static void
2629
vy_squash_schedule(struct vy_lsm *lsm, struct vy_entry entry,
2630
                   void /* struct vy_env */ *arg);
2631

2632
static struct vy_env *
2633
vy_env_new(const char *path, size_t memory,
3,383✔
2634
           int read_threads, int write_threads, bool force_recovery)
2635
{
2636
        struct vy_env *e = malloc(sizeof(*e));
3,383✔
2637
        if (unlikely(e == NULL)) {
3,383!
2638
                diag_set(OutOfMemory, sizeof(*e), "malloc", "struct vy_env");
×
2639
                return NULL;
×
2640
        }
2641
        memset(e, 0, sizeof(*e));
3,383✔
2642
        e->status = VINYL_OFFLINE;
3,383✔
2643
        e->timeout = TIMEOUT_INFINITY;
3,383✔
2644
        e->force_recovery = force_recovery;
3,383✔
2645
        e->path = strdup(path);
3,383✔
2646
        if (e->path == NULL) {
3,383!
2647
                diag_set(OutOfMemory, strlen(path),
×
2648
                         "malloc", "env->path");
2649
                goto error_path;
×
2650
        }
2651

2652
        e->xm = vy_tx_manager_new();
3,383✔
2653
        if (e->xm == NULL)
3,383!
2654
                goto error_xm;
×
2655
        e->squash_queue = vy_squash_queue_new();
3,383✔
2656
        if (e->squash_queue == NULL)
3,383!
2657
                goto error_squash_queue;
×
2658

2659
        vy_stmt_env_create(&e->stmt_env);
3,383✔
2660
        vy_mem_env_create(&e->mem_env, memory);
3,383✔
2661
        vy_scheduler_create(&e->scheduler, write_threads,
3,383✔
2662
                            vy_env_dump_complete_cb,
2663
                            &e->run_env, &e->xm->read_views,
3,383✔
2664
                            &e->quota);
2665

2666
        if (vy_lsm_env_create(&e->lsm_env, e->path,
3,383!
2667
                              &e->scheduler.generation,
2668
                              e->stmt_env.key_format,
2669
                              vy_squash_schedule, e) != 0)
2670
                goto error_lsm_env;
×
2671

2672
        vy_quota_create(&e->quota, memory, vy_env_quota_exceeded_cb);
3,383✔
2673
        vy_regulator_create(&e->regulator, &e->quota,
3,383✔
2674
                            vy_env_trigger_dump_cb);
2675

2676
        struct slab_cache *slab_cache = cord_slab_cache();
3,383✔
2677
        mempool_create(&e->iterator_pool, slab_cache,
3,383✔
2678
                       sizeof(struct vinyl_iterator));
2679
        vy_cache_env_create(&e->cache_env, slab_cache);
3,383✔
2680
        vy_run_env_create(&e->run_env, read_threads);
3,383✔
2681
        vy_log_init(e->path);
3,383✔
2682
        return e;
3,383✔
2683

2684
error_lsm_env:
×
2685
        vy_mem_env_destroy(&e->mem_env);
×
2686
        vy_scheduler_destroy(&e->scheduler);
×
2687
        vy_squash_queue_delete(e->squash_queue);
×
2688
error_squash_queue:
×
2689
        vy_tx_manager_delete(e->xm);
×
2690
error_xm:
×
2691
        free(e->path);
×
2692
error_path:
×
2693
        free(e);
×
2694
        return NULL;
×
2695
}
2696

2697
static void
2698
vy_env_delete(struct vy_env *e)
3,343✔
2699
{
2700
        vy_regulator_destroy(&e->regulator);
3,343✔
2701
        vy_scheduler_destroy(&e->scheduler);
3,343✔
2702
        vy_squash_queue_delete(e->squash_queue);
3,343✔
2703
        vy_tx_manager_delete(e->xm);
3,343✔
2704
        free(e->path);
3,343✔
2705
        mempool_destroy(&e->iterator_pool);
3,343✔
2706
        vy_run_env_destroy(&e->run_env);
3,343✔
2707
        vy_lsm_env_destroy(&e->lsm_env);
3,343✔
2708
        vy_mem_env_destroy(&e->mem_env);
3,343✔
2709
        vy_cache_env_destroy(&e->cache_env);
3,343✔
2710
        vy_stmt_env_destroy(&e->stmt_env);
3,343✔
2711
        vy_quota_destroy(&e->quota);
3,343✔
2712
        if (e->recovery != NULL)
3,343✔
2713
                vy_recovery_delete(e->recovery);
1✔
2714
        vy_log_free();
3,343✔
2715
        TRASH(e);
3,343✔
2716
        free(e);
3,343✔
2717
}
3,343✔
2718

2719
/**
2720
 * Called upon local recovery completion to enable memory quota
2721
 * and start the scheduler.
2722
 */
2723
static void
2724
vy_env_complete_recovery(struct vy_env *env)
3,344✔
2725
{
2726
        vy_scheduler_start(&env->scheduler);
3,344✔
2727
        vy_quota_enable(&env->quota);
3,344✔
2728
        vy_regulator_start(&env->regulator);
3,344✔
2729
}
3,344✔
2730

2731
struct engine *
2732
vinyl_engine_new(const char *dir, size_t memory,
3,383✔
2733
                 int read_threads, int write_threads, bool force_recovery)
2734
{
2735
        struct vy_env *env = vy_env_new(dir, memory, read_threads,
3,383✔
2736
                                        write_threads, force_recovery);
2737
        if (env == NULL)
3,383!
2738
                return NULL;
×
2739

2740
        env->base.vtab = &vinyl_engine_vtab;
3,383✔
2741
        env->base.name = "vinyl";
3,383✔
2742
        env->base.flags = ENGINE_TXM_HANDLES_DDL;
3,383✔
2743
        return &env->base;
3,383✔
2744
}
2745

2746
static void
2747
vinyl_engine_shutdown(struct engine *engine)
3,343✔
2748
{
2749
        struct vy_env *env = vy_env(engine);
3,343✔
2750
        vy_env_delete(env);
3,343✔
2751
}
3,343✔
2752

2753
void
2754
vinyl_engine_set_cache(struct engine *engine, size_t quota)
3,409✔
2755
{
2756
        struct vy_env *env = vy_env(engine);
3,409✔
2757
        vy_cache_env_set_quota(&env->cache_env, quota);
3,409✔
2758
}
3,409✔
2759

2760
int
2761
vinyl_engine_set_memory(struct engine *engine, size_t size)
3✔
2762
{
2763
        struct vy_env *env = vy_env(engine);
3✔
2764
        if (size < env->quota.limit) {
3✔
2765
                diag_set(ClientError, ER_CFG, "vinyl_memory",
1!
2766
                         "cannot decrease memory size at runtime");
2767
                return -1;
1✔
2768
        }
2769
        vy_regulator_set_memory_limit(&env->regulator, size);
2✔
2770
        return 0;
2✔
2771
}
2772

2773
void
2774
vinyl_engine_set_max_tuple_size(struct engine *engine, size_t max_size)
3,397✔
2775
{
2776
        struct vy_env *env = vy_env(engine);
3,397✔
2777
        env->stmt_env.max_tuple_size = max_size;
3,397✔
2778
}
3,397✔
2779

2780
void
2781
vinyl_engine_set_timeout(struct engine *engine, double timeout)
3,390✔
2782
{
2783
        struct vy_env *env = vy_env(engine);
3,390✔
2784
        env->timeout = timeout;
3,390✔
2785
}
3,390✔
2786

2787
void
2788
vinyl_engine_set_too_long_threshold(struct engine *engine,
3,391✔
2789
                                    double too_long_threshold)
2790
{
2791
        struct vy_env *env = vy_env(engine);
3,391✔
2792
        env->quota.too_long_threshold = too_long_threshold;
3,391✔
2793
        env->lsm_env.too_long_threshold = too_long_threshold;
3,391✔
2794
}
3,391✔
2795

2796
void
2797
vinyl_engine_set_snap_io_rate_limit(struct engine *engine, double limit)
35✔
2798
{
2799
        struct vy_env *env = vy_env(engine);
35✔
2800
        int64_t limit_in_bytes = limit * 1024 * 1024;
35✔
2801
        env->run_env.snap_io_rate_limit = limit_in_bytes;
35✔
2802
        vy_regulator_reset_dump_bandwidth(&env->regulator, limit_in_bytes);
35✔
2803
}
35✔
2804

2805
/** }}} Environment */
2806

2807
/* {{{ Checkpoint */
2808

2809
static int
2810
vinyl_engine_begin_checkpoint(struct engine *engine, bool is_scheduled)
3,924✔
2811
{
2812
        (void) is_scheduled;
2813
        struct vy_env *env = vy_env(engine);
3,924✔
2814
        assert(env->status == VINYL_ONLINE);
3,924!
2815
        /*
2816
         * The scheduler starts worker threads upon the first wakeup.
2817
         * To avoid starting the threads for nothing, do not wake it
2818
         * up if Vinyl is not used.
2819
         */
2820
        if (lsregion_used(&env->mem_env.allocator) == 0)
3,924✔
2821
                return 0;
3,133✔
2822
        if (vy_scheduler_begin_checkpoint(&env->scheduler, is_scheduled) != 0)
791!
2823
                return -1;
×
2824
        return 0;
791✔
2825
}
2826

2827
static int
2828
vinyl_engine_wait_checkpoint(struct engine *engine,
3,908✔
2829
                             const struct vclock *vclock)
2830
{
2831
        struct vy_env *env = vy_env(engine);
3,908✔
2832
        assert(env->status == VINYL_ONLINE);
3,908!
2833
        if (vy_scheduler_wait_checkpoint(&env->scheduler) != 0)
3,908✔
2834
                return -1;
10✔
2835
        if (vy_log_rotate(vclock) != 0)
3,897✔
2836
                return -1;
3✔
2837
        return 0;
3,894✔
2838
}
2839

2840
static void
2841
vinyl_engine_commit_checkpoint(struct engine *engine,
3,894✔
2842
                               const struct vclock *vclock)
2843
{
2844
        (void)vclock;
2845
        struct vy_env *env = vy_env(engine);
3,894✔
2846
        assert(env->status == VINYL_ONLINE);
3,894!
2847
        vy_scheduler_end_checkpoint(&env->scheduler);
3,894✔
2848
}
3,894✔
2849

2850
static void
2851
vinyl_engine_abort_checkpoint(struct engine *engine)
23✔
2852
{
2853
        struct vy_env *env = vy_env(engine);
23✔
2854
        assert(env->status == VINYL_ONLINE);
23!
2855
        vy_scheduler_end_checkpoint(&env->scheduler);
23✔
2856
}
23✔
2857

2858
/* }}} Checkpoint */
2859

2860
/** {{{ Recovery */
2861

2862
/**
2863
 * Install trigger on the _vinyl_deferred_delete system space.
2864
 * Called on bootstrap and recovery. Note, this function can't
2865
 * be called from engine constructor, because the latter is
2866
 * invoked before the schema is initialized.
2867
 */
2868
static void
2869
vy_set_deferred_delete_trigger(void)
3,368✔
2870
{
2871
        struct space *space = space_by_id(BOX_VINYL_DEFERRED_DELETE_ID);
3,368✔
2872
        assert(space != NULL);
3,368!
2873
        trigger_add(&space->on_replace, &on_replace_vinyl_deferred_delete);
3,368✔
2874
}
3,368✔
2875

2876
static int
2877
vinyl_engine_bootstrap(struct engine *engine)
2,201✔
2878
{
2879
        vy_set_deferred_delete_trigger();
2,201✔
2880

2881
        struct vy_env *e = vy_env(engine);
2,201✔
2882
        assert(e->status == VINYL_OFFLINE);
2,201!
2883
        if (vy_log_bootstrap() != 0)
2,201✔
2884
                return -1;
2✔
2885
        vy_env_complete_recovery(e);
2,199✔
2886
        e->status = VINYL_ONLINE;
2,199✔
2887
        return 0;
2,199✔
2888
}
2889

2890
static int
2891
vinyl_engine_begin_initial_recovery(struct engine *engine,
1,167✔
2892
                                    const struct vclock *recovery_vclock)
2893
{
2894
        vy_set_deferred_delete_trigger();
1,167✔
2895

2896
        struct vy_env *e = vy_env(engine);
1,167✔
2897
        assert(e->status == VINYL_OFFLINE);
1,167!
2898
        if (recovery_vclock != NULL) {
1,167✔
2899
                e->recovery_vclock = recovery_vclock;
697✔
2900
                e->recovery = vy_log_begin_recovery(recovery_vclock,
1,394✔
2901
                                                    e->force_recovery);
697✔
2902
                if (e->recovery == NULL)
697!
2903
                        return -1;
×
2904
                /*
2905
                 * We can't schedule any background tasks until
2906
                 * local recovery is complete, because they would
2907
                 * disrupt yet to be recovered data stored on disk.
2908
                 * So we don't start the scheduler fiber or enable
2909
                 * memory limit until then.
2910
                 *
2911
                 * This is OK, because during recovery an instance
2912
                 * can't consume more memory than it used before
2913
                 * restart and hence memory dumps are not necessary.
2914
                 */
2915
                e->status = VINYL_INITIAL_RECOVERY_LOCAL;
697✔
2916
        } else {
2917
                if (vy_log_bootstrap() != 0)
470!
2918
                        return -1;
×
2919
                vy_env_complete_recovery(e);
470✔
2920
                e->status = VINYL_INITIAL_RECOVERY_REMOTE;
470✔
2921
                e->run_env.initial_join = true;
470✔
2922
        }
2923
        return 0;
1,167✔
2924
}
2925

2926
static int
2927
vinyl_engine_begin_final_recovery(struct engine *engine)
1,150✔
2928
{
2929
        struct vy_env *e = vy_env(engine);
1,150✔
2930
        switch (e->status) {
1,150!
2931
        case VINYL_INITIAL_RECOVERY_LOCAL:
682✔
2932
                e->status = VINYL_FINAL_RECOVERY_LOCAL;
682✔
2933
                break;
682✔
2934
        case VINYL_INITIAL_RECOVERY_REMOTE:
468✔
2935
                e->status = VINYL_FINAL_RECOVERY_REMOTE;
468✔
2936
                e->run_env.initial_join = false;
468✔
2937
                break;
468✔
2938
        default:
×
2939
                unreachable();
×
2940
        }
2941
        return 0;
1,150✔
2942
}
2943

2944
/**
2945
 * Vinyl doesn't support the hot standby mode so we raise an error on
2946
 * an attempt to enter the hot standby mode in case the instance has
2947
 * Vinyl spaces.  We also have a check in vinyl_index_open() that fails
2948
 * on an attempt to create a Vinyl space in the hot standby mode.
2949
 */
2950
static int
2951
vinyl_engine_begin_hot_standby(struct engine *engine)
3✔
2952
{
2953
        struct vy_env *e = vy_env(engine);
3✔
2954
        assert(e->status == VINYL_FINAL_RECOVERY_LOCAL);
3!
2955
        if (e->lsm_env.lsm_count > 0) {
3✔
2956
                diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
1!
2957
                         "hot standby mode");
2958
                return -1;
1✔
2959
        }
2960
        e->status = VINYL_HOT_STANDBY;
2✔
2961
        return 0;
2✔
2962
}
2963

2964
static int
2965
vinyl_engine_end_recovery(struct engine *engine)
1,141✔
2966
{
2967
        struct vy_env *e = vy_env(engine);
1,141✔
2968
        switch (e->status) {
1,141!
2969
        case VINYL_FINAL_RECOVERY_LOCAL:
675✔
2970
        case VINYL_HOT_STANDBY:
2971
                if (vy_log_end_recovery() != 0)
675!
2972
                        return -1;
×
2973
                /*
2974
                 * If the instance is shut down while a dump or
2975
                 * compaction task is in progress, we'll get an
2976
                 * unfinished run file on disk, i.e. a run file
2977
                 * which was either not written to the end or not
2978
                 * inserted into a range. We need to delete such
2979
                 * runs on recovery.
2980
                 */
2981
                vy_gc(e, e->recovery, VY_GC_INCOMPLETE, INT64_MAX);
675✔
2982
                vy_recovery_delete(e->recovery);
675✔
2983
                e->recovery = NULL;
675✔
2984
                /*
2985
                 * During recovery we skip statements that have
2986
                 * been dumped to disk - see vy_is_committed() -
2987
                 * so it may turn out that vy_tx_manager::lsn stays
2988
                 * behind the instance vclock while we need it
2989
                 * to be up-to-date once recovery is complete,
2990
                 * because we use it while building an index to
2991
                 * skip statements inserted after build began -
2992
                 * see vinyl_space_build_index() - so we reset
2993
                 * it upon recovery completion.
2994
                 */
2995
                e->xm->lsn = vclock_sum(e->recovery_vclock);
675✔
2996
                e->recovery_vclock = NULL;
675✔
2997
                vy_env_complete_recovery(e);
675✔
2998
                break;
675✔
2999
        case VINYL_FINAL_RECOVERY_REMOTE:
466✔
3000
                break;
466✔
3001
        default:
×
3002
                unreachable();
×
3003
        }
3004
        /*
3005
         * Do not start reader threads if no LSM tree was recovered.
3006
         * The threads will be started lazily upon the first LSM tree
3007
         * creation, see vinyl_index_open().
3008
         */
3009
        if (e->lsm_env.lsm_count > 0)
1,141✔
3010
                vy_run_env_enable_coio(&e->run_env);
278✔
3011

3012
        e->status = VINYL_ONLINE;
1,141✔
3013
        return 0;
1,141✔
3014
}
3015

3016
/** }}} Recovery */
3017

3018
/** {{{ Replication */
3019

3020
struct vy_join_entry {
3021
        /** Link in vy_join_ctx::entries. */
3022
        struct rlist in_ctx;
3023
        /** Iterator over the primary index of the space. */
3024
        struct vy_read_iterator iterator;
3025
};
3026

3027
struct vy_join_ctx {
3028
        /** List of spaces to relay. Linked by vy_join_entry::in_ctx. */
3029
        struct rlist entries;
3030
        /** Read view at the time when the initial join started. */
3031
        struct vy_read_view *rv;
3032
};
3033

3034
#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR)
3035
#include "vinyl_checkpoint_join.c"
3036
#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
3037

3038
static int
3039
vinyl_engine_prepare_checkpoint_join(struct engine *engine,
×
3040
                                     struct engine_join_ctx *ctx)
3041
{
3042
        (void)engine;
3043
        (void)ctx;
3044
        unreachable();
×
3045
        return -1;
3046
}
3047

3048
static int
3049
vinyl_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx,
×
3050
                             struct xstream *stream)
3051
{
3052
        (void)engine;
3053
        (void)ctx;
3054
        (void)stream;
3055
        unreachable();
×
3056
        return -1;
3057
}
3058

3059
static void
3060
vinyl_engine_complete_checkpoint_join(struct engine *engine,
×
3061
                                      struct engine_join_ctx *ctx)
3062
{
3063
        (void)engine;
3064
        (void)ctx;
3065
}
×
3066

3067
#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
3068

3069
static int
3070
vy_join_add_space(struct space *space, void *arg)
12,837✔
3071
{
3072
        struct vy_join_ctx *ctx = arg;
12,837✔
3073
        if (!space_is_vinyl(space))
12,837✔
3074
                return 0;
12,722✔
3075
        if (space_is_local(space))
115✔
3076
                return 0;
2✔
3077
        struct index *pk = space_index(space, 0);
113✔
3078
        if (pk == NULL)
113!
3079
                return 0;
×
3080
        struct vy_lsm *lsm = vy_lsm(pk);
113✔
3081
        struct vy_join_entry *entry = malloc(sizeof(*entry));
113✔
3082
        if (entry == NULL) {
113!
3083
                diag_set(OutOfMemory, sizeof(*entry),
×
3084
                         "malloc", "struct vy_join_entry");
3085
                return -1;
×
3086
        }
3087
        vy_read_iterator_open(&entry->iterator, lsm, NULL, ITER_ALL,
113✔
3088
                              lsm->env->empty_key,
113✔
3089
                              (const struct vy_read_view **)&ctx->rv);
113✔
3090
        /*
3091
         * The space can be dropped while initial join is in progress.
3092
         * Take a reference to the index to make sure the iterator stays
3093
         * valid.
3094
         */
3095
        vy_lsm_ref(lsm);
113✔
3096
        rlist_add_tail_entry(&ctx->entries, entry, in_ctx);
113✔
3097
        return 0;
113✔
3098
}
3099

3100
static int
3101
vinyl_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg)
482✔
3102
{
3103
        if (arg->cursor != NULL)
482!
3104
                return vinyl_engine_prepare_checkpoint_join(engine, arg);
×
3105

3106
        struct vy_env *env = vy_env(engine);
482✔
3107
        struct vy_join_ctx *ctx = malloc(sizeof(*ctx));
482✔
3108
        if (ctx == NULL) {
482!
3109
                diag_set(OutOfMemory, sizeof(*ctx),
×
3110
                         "malloc", "struct vy_join_ctx");
3111
                return -1;
×
3112
        }
3113
        rlist_create(&ctx->entries);
482✔
3114
        ctx->rv = vy_tx_manager_read_view(env->xm, /*plsn=*/INT64_MAX);
482✔
3115
        if (ctx->rv == NULL) {
482!
3116
                free(ctx);
×
3117
                return -1;
×
3118
        }
3119
        arg->data[engine->id] = ctx;
482✔
3120
        return space_foreach(vy_join_add_space, ctx);
482✔
3121
}
3122

3123
static int
3124
vy_join_send_tuple(struct xstream *stream, uint32_t space_id,
13,203✔
3125
                   struct tuple *tuple)
3126
{
3127
        struct request_replace_body body;
3128
        request_replace_body_create(&body, space_id);
13,203!
3129

3130
        struct xrow_header row;
3131
        memset(&row, 0, sizeof(row));
13,203✔
3132
        row.type = IPROTO_INSERT;
13,203✔
3133

3134
        row.bodycnt = 2;
13,203✔
3135
        row.body[0].iov_base = &body;
13,203✔
3136
        row.body[0].iov_len = sizeof(body);
13,203✔
3137
        row.body[1].iov_base = (char *)tuple_data(tuple);
13,203!
3138
        row.body[1].iov_len = tuple_bsize(tuple);
13,203!
3139

3140
        return xstream_write(stream, &row);
13,203!
3141
}
3142

3143
static int
3144
vinyl_engine_join(struct engine *engine, struct engine_join_ctx *arg,
479✔
3145
                  struct xstream *stream)
3146
{
3147
        if (arg->cursor != NULL)
479!
3148
                return vinyl_engine_checkpoint_join(engine, arg, stream);
×
3149

3150
        int loops = 0;
479✔
3151
        struct vy_join_ctx *ctx = arg->data[engine->id];
479✔
3152
        struct vy_join_entry *join_entry;
3153
        rlist_foreach_entry(join_entry, &ctx->entries, in_ctx) {
1,180✔
3154
                struct vy_read_iterator *it = &join_entry->iterator;
112✔
3155
                int rc;
3156
                struct vy_entry entry;
3157
                while ((rc = vy_read_iterator_next(it, &entry)) == 0 &&
13,315!
3158
                       entry.stmt != NULL) {
13,314✔
3159
                        if (vy_join_send_tuple(stream, it->lsm->space_id,
13,203!
3160
                                               entry.stmt) != 0)
3161
                                return -1;
1✔
3162
                }
3163
                if (rc != 0)
112✔
3164
                        return -1;
1✔
3165
                if (++loops % VY_YIELD_LOOPS == 0)
111✔
3166
                        fiber_sleep(0);
5✔
3167
        }
3168
        xstream_reset(stream);
478✔
3169
        return 0;
478✔
3170
}
3171

3172
static void
3173
vinyl_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg)
481✔
3174
{
3175
        if (arg->cursor != NULL)
481!
3176
                return vinyl_engine_complete_checkpoint_join(engine, arg);
×
3177

3178
        struct vy_env *env = vy_env(engine);
481✔
3179
        struct vy_join_ctx *ctx = arg->data[engine->id];
481✔
3180
        struct vy_join_entry *entry, *next;
3181
        rlist_foreach_entry_safe(entry, &ctx->entries, in_ctx, next) {
1,188!
3182
                struct vy_lsm *lsm = entry->iterator.lsm;
113✔
3183
                vy_read_iterator_close(&entry->iterator);
113✔
3184
                vy_lsm_unref(lsm);
113✔
3185
                free(entry);
113✔
3186
        }
3187
        vy_tx_manager_destroy_read_view(env->xm, ctx->rv);
481✔
3188
        free(ctx);
481✔
3189
}
3190

3191
/* }}} Replication */
3192

3193
/* {{{ Garbage collection */
3194

3195
/**
3196
 * Given a record encoding information about a vinyl run, try to
3197
 * delete the corresponding files. On success, write a "forget" record
3198
 * to the log so that all information about the run is deleted on the
3199
 * next log rotation.
3200
 */
3201
static void
3202
vy_gc_run(struct vy_env *env,
1,366✔
3203
          struct vy_lsm_recovery_info *lsm_info,
3204
          struct vy_run_recovery_info *run_info)
3205
{
3206
        /* Try to delete files. */
3207
        if (vy_run_remove_files(env->path, lsm_info->space_id,
1,366✔
3208
                                lsm_info->index_id, run_info->id) != 0)
3209
                return;
3✔
3210

3211
        /* Forget the run on success. */
3212
        vy_log_tx_begin();
1,363✔
3213
        vy_log_forget_run(run_info->id);
1,363✔
3214
        /*
3215
         * Leave the record in the vylog buffer on disk error.
3216
         * If we fail to flush it before restart, we will retry
3217
         * to delete the run file next time garbage collection
3218
         * is invoked, which is harmless.
3219
         */
3220
        vy_log_tx_try_commit();
1,363✔
3221
}
3222

3223
/**
3224
 * Given a dropped or not fully built LSM tree, delete all its
3225
 * ranges and slices and mark all its runs as dropped. Forget
3226
 * the LSM tree if it has no associated objects.
3227
 */
3228
static void
3229
vy_gc_lsm(struct vy_lsm_recovery_info *lsm_info)
1,823✔
3230
{
3231
        assert(lsm_info->drop_lsn >= 0 ||
1,823!
3232
               lsm_info->create_lsn < 0);
3233

3234
        vy_log_tx_begin();
1,823✔
3235
        if (lsm_info->drop_lsn < 0) {
1,823✔
3236
                lsm_info->drop_lsn = 0;
4✔
3237
                vy_log_drop_lsm(lsm_info->id, 0);
4✔
3238
        }
3239
        struct vy_range_recovery_info *range_info;
3240
        rlist_foreach_entry(range_info, &lsm_info->ranges, in_lsm) {
5,670✔
3241
                struct vy_slice_recovery_info *slice_info;
3242
                rlist_foreach_entry(slice_info, &range_info->slices, in_range)
2,752✔
3243
                        vy_log_delete_slice(slice_info->id);
364✔
3244
                vy_log_delete_range(range_info->id);
1,012✔
3245
        }
3246
        struct vy_run_recovery_info *run_info;
3247
        rlist_foreach_entry(run_info, &lsm_info->runs, in_lsm) {
5,540✔
3248
                if (lsm_info->create_lsn < 0)
947✔
3249
                        run_info->is_incomplete = true;
5✔
3250
                if (!run_info->is_dropped) {
947✔
3251
                        run_info->is_dropped = true;
367✔
3252
                        run_info->gc_lsn = lsm_info->drop_lsn;
367✔
3253
                        vy_log_drop_run(run_info->id, run_info->gc_lsn);
367✔
3254
                }
3255
        }
3256
        if (rlist_empty(&lsm_info->ranges) &&
3,646✔
3257
            rlist_empty(&lsm_info->runs))
1,640✔
3258
                vy_log_forget_lsm(lsm_info->id);
579✔
3259
        vy_log_tx_try_commit();
1,823✔
3260
}
1,823✔
3261

3262
/**
3263
 * Delete unused run files stored in the recovery context.
3264
 * @param env      Vinyl environment.
3265
 * @param recovery Recovery context.
3266
 * @param gc_mask  Specifies what kinds of runs to delete (see VY_GC_*).
3267
 * @param gc_lsn   LSN of the oldest checkpoint to save.
3268
 */
3269
static void
3270
vy_gc(struct vy_env *env, struct vy_recovery *recovery,
1,459✔
3271
      unsigned int gc_mask, int64_t gc_lsn)
3272
{
3273
        int loops = 0;
1,459✔
3274
        struct vy_lsm_recovery_info *lsm_info;
3275
        rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
10,478✔
3276
                if ((lsm_info->drop_lsn >= 0 &&
3,780✔
3277
                     (gc_mask & VY_GC_DROPPED) != 0) ||
2,248✔
3278
                    (lsm_info->create_lsn < 0 &&
1,982✔
3279
                     (gc_mask & VY_GC_INCOMPLETE) != 0))
26✔
3280
                        vy_gc_lsm(lsm_info);
1,823✔
3281

3282
                struct vy_run_recovery_info *run_info;
3283
                rlist_foreach_entry(run_info, &lsm_info->runs, in_lsm) {
19,430✔
3284
                        if ((run_info->is_dropped &&
5,935✔
3285
                             run_info->gc_lsn < gc_lsn &&
3,135✔
3286
                             (gc_mask & VY_GC_DROPPED) != 0) ||
1,641✔
3287
                            (run_info->is_incomplete &&
4,588✔
3288
                             (gc_mask & VY_GC_INCOMPLETE) != 0)) {
213✔
3289
                                vy_gc_run(env, lsm_info, run_info);
1,366✔
3290
                        }
3291
                        if (loops % VY_YIELD_LOOPS == 0)
5,935!
3292
                                fiber_sleep(0);
5,935✔
3293
                }
3294
        }
3295
}
1,459✔
3296

3297
static void
3298
vinyl_engine_collect_garbage(struct engine *engine, const struct vclock *vclock)
784✔
3299
{
3300
        struct vy_env *env = vy_env(engine);
784✔
3301

3302
        /* Cleanup old metadata log files. */
3303
        vy_log_collect_garbage(vclock);
784✔
3304

3305
        /* Cleanup run files. */
3306
        struct vy_recovery *recovery = vy_recovery_new(-1, 0);
784✔
3307
        if (recovery == NULL) {
784!
3308
                say_error("failed to recover vylog for garbage collection");
×
3309
                return;
×
3310
        }
3311
        vy_gc(env, recovery, VY_GC_DROPPED, vclock_sum(vclock));
784✔
3312
        vy_recovery_delete(recovery);
784✔
3313
}
3314

3315
/* }}} Garbage collection */
3316

3317
/* {{{ Backup */
3318

3319
static int
3320
vinyl_engine_backup(struct engine *engine, const struct vclock *vclock,
11✔
3321
                    engine_backup_cb cb, void *cb_arg)
3322
{
3323
        struct vy_env *env = vy_env(engine);
11✔
3324

3325
        /* Backup the metadata log. */
3326
        const char *path = vy_log_backup_path(vclock);
11✔
3327
        if (path == NULL)
11✔
3328
                return 0; /* vinyl not used */
4✔
3329
        if (cb(path, cb_arg) != 0)
7!
3330
                return -1;
×
3331

3332
        /* Backup run files. */
3333
        struct vy_recovery *recovery;
3334
        recovery = vy_recovery_new(vclock_sum(vclock),
7✔
3335
                                   VY_RECOVERY_LOAD_CHECKPOINT);
3336
        if (recovery == NULL) {
7!
3337
                say_error("failed to recover vylog for backup");
×
3338
                return -1;
×
3339
        }
3340
        int rc = 0;
7✔
3341
        int loops = 0;
7✔
3342
        struct vy_lsm_recovery_info *lsm_info;
3343
        rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
36✔
3344
                if (lsm_info->drop_lsn >= 0 || lsm_info->create_lsn < 0) {
11!
3345
                        /* Dropped or not yet built LSM tree. */
3346
                        continue;
3✔
3347
                }
3348
                struct vy_run_recovery_info *run_info;
3349
                rlist_foreach_entry(run_info, &lsm_info->runs, in_lsm) {
50✔
3350
                        if (run_info->is_dropped || run_info->is_incomplete)
17✔
3351
                                continue;
6✔
3352
                        char path[PATH_MAX];
3353
                        for (int type = 0; type < vy_file_MAX; type++) {
55✔
3354
                                if (type == VY_FILE_RUN_INPROGRESS ||
44✔
3355
                                    type == VY_FILE_INDEX_INPROGRESS)
3356
                                        continue;
22✔
3357
                                vy_run_snprint_path(path, sizeof(path),
22✔
3358
                                                    env->path,
22!
3359
                                                    lsm_info->space_id,
3360
                                                    lsm_info->index_id,
3361
                                                    run_info->id, type);
3362
                                rc = cb(path, cb_arg);
22!
3363
                                if (rc != 0)
22!
3364
                                        goto out;
×
3365
                        }
3366
                        if (loops % VY_YIELD_LOOPS == 0)
11!
3367
                                fiber_sleep(0);
11✔
3368
                }
3369
        }
3370
out:
7✔
3371
        vy_recovery_delete(recovery);
7✔
3372
        return rc;
7✔
3373
}
3374

3375
/* }}} Backup */
3376

3377
/**
3378
 * This structure represents a request to squash a sequence of
3379
 * UPSERT statements by inserting the resulting REPLACE statement
3380
 * after them.
3381
 */
3382
struct vy_squash {
3383
        /** Next in vy_squash_queue->queue. */
3384
        struct stailq_entry next;
3385
        /** Vinyl environment. */
3386
        struct vy_env *env;
3387
        /** LSM tree this request is for. */
3388
        struct vy_lsm *lsm;
3389
        /** Key to squash upserts for. */
3390
        struct vy_entry entry;
3391
};
3392

3393
struct vy_squash_queue {
3394
        /** Fiber doing background upsert squashing. */
3395
        struct fiber *fiber;
3396
        /** Used to wake up the fiber to process more requests. */
3397
        struct fiber_cond cond;
3398
        /** Queue of vy_squash objects to be processed. */
3399
        struct stailq queue;
3400
        /** Mempool for struct vy_squash. */
3401
        struct mempool pool;
3402
};
3403

3404
static struct vy_squash *
3405
vy_squash_new(struct mempool *pool, struct vy_env *env,
69✔
3406
              struct vy_lsm *lsm, struct vy_entry entry)
3407
{
3408
        struct vy_squash *squash;
3409
        squash = mempool_alloc(pool);
69✔
3410
        if (squash == NULL)
69!
3411
                return NULL;
×
3412
        squash->env = env;
69✔
3413
        vy_lsm_ref(lsm);
69✔
3414
        squash->lsm = lsm;
69✔
3415
        tuple_ref(entry.stmt);
69✔
3416
        squash->entry = entry;
69✔
3417
        return squash;
69✔
3418
}
3419

3420
static void
3421
vy_squash_delete(struct mempool *pool, struct vy_squash *squash)
69✔
3422
{
3423
        vy_lsm_unref(squash->lsm);
69✔
3424
        tuple_unref(squash->entry.stmt);
69✔
3425
        mempool_free(pool, squash);
69✔
3426
}
69✔
3427

3428
static int
3429
vy_squash_process(struct vy_squash *squash)
69✔
3430
{
3431
        struct errinj *inj = errinj(ERRINJ_VY_SQUASH_TIMEOUT, ERRINJ_DOUBLE);
69!
3432
        if (inj != NULL && inj->dparam > 0)
69!
3433
                fiber_sleep(inj->dparam);
2✔
3434

3435
        struct vy_lsm *lsm = squash->lsm;
69✔
3436
        struct vy_env *env = squash->env;
69✔
3437

3438
        /* Upserts enabled only in the primary index LSM tree. */
3439
        assert(lsm->index_id == 0);
69!
3440

3441
        /*
3442
         * Use the committed read view to avoid squashing
3443
         * prepared, but not committed statements.
3444
         */
3445
        struct vy_entry result;
3446
        if (vy_point_lookup(lsm, NULL, &env->xm->p_committed_read_view,
69!
3447
                            squash->entry, &result) != 0)
3448
                return -1;
69✔
3449
        if (result.stmt == NULL)
69!
3450
                return 0;
×
3451

3452
        /*
3453
         * While we yielded in vy_point_lookup() above, the memory
3454
         * generation could have been bumped so we might need to
3455
         * rotate the active in-memory index.
3456
         */
3457
        if (vy_lsm_rotate_mem_if_required(lsm) != 0) {
69!
3458
                tuple_unref(result.stmt);
×
3459
                return -1;
×
3460
        }
3461

3462
        /*
3463
         * While we were reading on-disk runs, new statements could
3464
         * have been prepared for the squashed key. We mustn't apply
3465
         * them, because they may be rolled back, but we must adjust
3466
         * their n_upserts counter so that they will get squashed by
3467
         * vy_lsm_commit_upsert().
3468
         */
3469
        struct vy_mem *mem = lsm->mem;
69✔
3470
        struct vy_mem_tree_key tree_key = {
138✔
3471
                .entry = result,
3472
                .lsn = vy_stmt_lsn(result.stmt),
69!
3473
        };
3474
        struct vy_mem_tree_iterator mem_itr =
3475
                vy_mem_tree_lower_bound(&mem->tree, &tree_key, NULL);
69!
3476
        if (vy_mem_tree_iterator_is_invalid(&mem_itr)) {
69!
3477
                /*
3478
                 * The in-memory tree we are squashing an upsert
3479
                 * for was dumped, nothing to do.
3480
                 */
3481
                tuple_unref(result.stmt);
2!
3482
                return 0;
2✔
3483
        }
3484
        vy_mem_tree_iterator_prev(&mem->tree, &mem_itr);
67!
3485
        uint8_t n_upserts = 0;
67✔
3486
        while (!vy_mem_tree_iterator_is_invalid(&mem_itr)) {
67!
3487
                struct vy_entry mem_entry;
3488
                mem_entry = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr);
7!
3489
                if (vy_entry_compare(result, mem_entry, lsm->cmp_def) != 0 ||
7!
3490
                    vy_stmt_type(mem_entry.stmt) != IPROTO_UPSERT)
×
3491
                        break;
3492
                assert(vy_stmt_is_prepared(mem_entry.stmt));
×
3493
                vy_stmt_set_n_upserts(mem_entry.stmt, n_upserts);
×
3494
                if (n_upserts <= VY_UPSERT_THRESHOLD)
×
3495
                        ++n_upserts;
×
3496
                vy_mem_tree_iterator_prev(&mem->tree, &mem_itr);
×
3497
        }
3498

3499
        lsm->stat.upsert.squashed++;
67✔
3500

3501
        /*
3502
         * Insert the resulting REPLACE statement to the mem
3503
         * and adjust the quota.
3504
         */
3505
        size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
67!
3506
        struct tuple *region_stmt = NULL;
67✔
3507
        int rc = vy_lsm_set(lsm, mem, result, &region_stmt);
67!
3508
        tuple_unref(result.stmt);
67!
3509
        result.stmt = region_stmt;
67✔
3510
        size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
67!
3511
        assert(mem_used_after >= mem_used_before);
67!
3512
        if (rc == 0) {
67!
3513
                /*
3514
                 * We don't modify the resulting statement,
3515
                 * so there's no need in invalidating the cache.
3516
                 */
3517
                vy_mem_commit_stmt(mem, result);
67!
3518
                vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
67!
3519
                                   mem_used_after - mem_used_before);
3520
                vy_regulator_check_dump_watermark(&env->regulator);
67!
3521
        }
3522
        return rc;
67✔
3523
}
3524

3525
static struct vy_squash_queue *
3526
vy_squash_queue_new(void)
3,383✔
3527
{
3528
        struct vy_squash_queue *sq = malloc(sizeof(*sq));
3,383✔
3529
        if (sq == NULL) {
3,383!
3530
                diag_set(OutOfMemory, sizeof(*sq), "malloc", "sq");
×
3531
                return NULL;
×
3532
        }
3533
        sq->fiber = NULL;
3,383✔
3534
        fiber_cond_create(&sq->cond);
3,383✔
3535
        stailq_create(&sq->queue);
3,383✔
3536
        mempool_create(&sq->pool, cord_slab_cache(),
3,383✔
3537
                       sizeof(struct vy_squash));
3538
        return sq;
3,383✔
3539
}
3540

3541
static void
3542
vy_squash_queue_delete(struct vy_squash_queue *sq)
3,343✔
3543
{
3544
        if (sq->fiber != NULL) {
3,343✔
3545
                sq->fiber = NULL;
2✔
3546
                /* Sic: fiber_cancel() can't be used here */
3547
                fiber_cond_signal(&sq->cond);
2✔
3548
        }
3549
        struct vy_squash *squash, *next;
3550
        stailq_foreach_entry_safe(squash, next, &sq->queue, next)
3,343!
3551
                vy_squash_delete(&sq->pool, squash);
×
3552
        free(sq);
3,343✔
3553
}
3,343✔
3554

3555
static int
3556
vy_squash_queue_f(va_list va)
2✔
3557
{
3558
        struct vy_squash_queue *sq = va_arg(va, struct vy_squash_queue *);
2✔
3559
        while (sq->fiber != NULL) {
140!
3560
                fiber_check_gc();
140✔
3561
                if (stailq_empty(&sq->queue)) {
140✔
3562
                        fiber_cond_wait(&sq->cond);
71✔
3563
                        continue;
69✔
3564
                }
3565
                struct vy_squash *squash;
3566
                squash = stailq_shift_entry(&sq->queue, struct vy_squash, next);
69✔
3567
                if (vy_squash_process(squash) != 0)
69!
3568
                        diag_log();
×
3569
                vy_squash_delete(&sq->pool, squash);
69✔
3570
        }
3571
        return 0;
×
3572
}
3573

3574
/*
3575
 * For a given UPSERT statement, insert the resulting REPLACE
3576
 * statement after it. Done in a background fiber.
3577
 */
3578
static void
3579
vy_squash_schedule(struct vy_lsm *lsm, struct vy_entry entry, void *arg)
69✔
3580
{
3581
        struct vy_env *env = arg;
69✔
3582
        struct vy_squash_queue *sq = env->squash_queue;
69✔
3583

3584
        say_verbose("%s: schedule upsert optimization for %s",
69!
3585
                    vy_lsm_name(lsm), vy_stmt_str(entry.stmt));
3586

3587
        /* Start the upsert squashing fiber on demand. */
3588
        if (sq->fiber == NULL) {
69✔
3589
                sq->fiber = fiber_new_system("vinyl.squash_queue",
2✔
3590
                                             vy_squash_queue_f);
3591
                if (sq->fiber == NULL)
2!
3592
                        goto fail;
×
3593
                fiber_start(sq->fiber, sq);
2✔
3594
        }
3595

3596
        struct vy_squash *squash = vy_squash_new(&sq->pool, env, lsm, entry);
69✔
3597
        if (squash == NULL)
69!
3598
                goto fail;
×
3599

3600
        stailq_add_tail_entry(&sq->queue, squash, next);
69✔
3601
        fiber_cond_signal(&sq->cond);
69✔
3602
        return;
69✔
3603
fail:
×
3604
        diag_log();
×
3605
        diag_clear(diag_get());
×
3606
}
3607

3608
/* {{{ Cursor */
3609

3610
static int
3611
vinyl_iterator_on_tx_destroy(struct trigger *trigger, void *event)
2,562✔
3612
{
3613
        (void)event;
3614
        struct vinyl_iterator *it = container_of(trigger,
2,562✔
3615
                        struct vinyl_iterator, on_tx_destroy);
3616
        it->tx = NULL;
2,562✔
3617
        return 0;
2,562✔
3618
}
3619

3620
static void
3621
vinyl_iterator_close(struct vinyl_iterator *it)
368,761✔
3622
{
3623
        vy_read_iterator_close(&it->iterator);
368,761✔
3624
        tuple_unref(it->key.stmt);
368,761✔
3625
        it->key = vy_entry_none();
368,761✔
3626
        if (it->tx == &it->tx_autocommit) {
368,761✔
3627
                /*
3628
                 * Rollback the automatic transaction.
3629
                 * Use vy_tx_destroy() so as not to spoil
3630
                 * the statistics of rollbacks issued by
3631
                 * user transactions.
3632
                 */
3633
                vy_tx_destroy(it->tx);
317,838✔
3634
        } else {
3635
                trigger_clear(&it->on_tx_destroy);
50,923✔
3636
        }
3637
        it->tx = NULL;
368,761✔
3638
        it->base.next = exhausted_iterator_next;
368,761✔
3639
}
368,761✔
3640

3641
/**
3642
 * Check if the transaction associated with the iterator is
3643
 * still valid.
3644
 */
3645
static int
3646
vinyl_iterator_check_tx(struct vinyl_iterator *it)
3,273,570✔
3647
{
3648
        bool no_transaction =
3,273,570✔
3649
                /* Transaction ended or cursor was closed. */
3650
                it->tx == NULL ||
6,547,140✔
3651
                /* Iterator was passed to another fiber. */
3652
                (it->tx != &it->tx_autocommit &&
3,273,570✔
3653
                 (in_txn() == NULL || it->tx != in_txn()->engine_tx));
522,670✔
3654

3655
        if (no_transaction) {
3,273,570✔
3656
                diag_set(ClientError, ER_CURSOR_NO_TRANSACTION);
7!
3657
                return -1;
7✔
3658
        }
3659
        if (it->tx->state == VINYL_TX_ABORT) {
3,273,560✔
3660
                /* Transaction read view was aborted. */
3661
                diag_set(ClientError, ER_READ_VIEW_ABORTED);
1!
3662
                return -1;
1✔
3663
        }
3664
        return 0;
3,273,560✔
3665
}
3666

3667
static void
3668
vinyl_iterator_account_read(struct vinyl_iterator *it, double start_time,
3,273,000✔
3669
                            struct tuple *result)
3670
{
3671
        struct vy_lsm *lsm = it->iterator.lsm;
3,273,000✔
3672
        struct tuple *key = it->iterator.key.stmt;
3,273,000✔
3673
        enum iterator_type type = it->iterator.iterator_type;
3,273,000✔
3674

3675
        double latency = ev_monotonic_now(loop()) - start_time;
3,273,000!
3676
        latency_collect(&lsm->stat.latency, latency);
3,273,000✔
3677

3678
        if (latency > lsm->env->too_long_threshold) {
3,273,000✔
3679
                say_warn_ratelimited("%s: select(%s, %s) => %s "
9!
3680
                                     "took too long: %.3f sec",
3681
                                     vy_lsm_name(lsm), tuple_str(key),
3682
                                     iterator_type_strs[type],
3683
                                     tuple_str(result), latency);
3684
        }
3685
        if (result != NULL)
3,273,000✔
3686
                vy_stmt_counter_acct_tuple(&lsm->stat.get, result);
2,944,220✔
3687
}
3,273,000✔
3688

3689
/** Updates the iterator position returned by vinyl_iterator_position(). */
3690
static void
3691
vinyl_iterator_update_pos(struct vinyl_iterator *it, struct vy_entry entry)
2,944,220✔
3692
{
3693
        assert(entry.stmt != NULL);
2,944,220!
3694
        if (it->pos.stmt != NULL)
2,944,220✔
3695
                tuple_unref(it->pos.stmt);
2,715,590✔
3696
        it->pos = entry;
2,944,220✔
3697
        tuple_ref(entry.stmt);
2,944,220✔
3698
}
2,944,220✔
3699

3700
static int
3701
vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret)
1,855,710✔
3702
{
3703
        double start_time = ev_monotonic_now(loop());
1,855,710!
3704

3705
        assert(base->next == vinyl_iterator_primary_next);
1,855,710!
3706
        struct vinyl_iterator *it = (struct vinyl_iterator *)base;
1,855,710✔
3707
        struct vy_lsm *lsm = it->iterator.lsm;
1,855,710✔
3708
        assert(lsm->index_id == 0);
1,855,710!
3709
        /*
3710
         * Make sure the LSM tree isn't deleted while we are
3711
         * reading from it.
3712
         */
3713
        vy_lsm_ref(lsm);
1,855,710!
3714

3715
        if (vinyl_iterator_check_tx(it) != 0)
1,855,710!
3716
                goto fail;
7✔
3717

3718
        struct vy_entry entry;
3719
        if (vy_read_iterator_next(&it->iterator, &entry) != 0)
1,855,700!
3720
                goto fail;
305✔
3721
        vy_read_iterator_cache_add(&it->iterator, entry);
1,855,400!
3722
        vinyl_iterator_account_read(it, start_time, entry.stmt);
1,855,400!
3723
        if (entry.stmt == NULL) {
1,855,400✔
3724
                /* EOF. Close the iterator immediately. */
3725
                vinyl_iterator_close(it);
168,007!
3726
        } else {
3727
                vinyl_iterator_update_pos(it, entry);
1,687,390!
3728
                tuple_bless(entry.stmt);
1,687,390!
3729
        }
3730
        *ret = entry.stmt;
1,855,400✔
3731
        vy_lsm_unref(lsm);
1,855,400!
3732
        return 0;
1,855,710✔
3733
fail:
312✔
3734
        vinyl_iterator_close(it);
312!
3735
        vy_lsm_unref(lsm);
312!
3736
        return -1;
312✔
3737
}
3738

3739
static int
3740
vinyl_iterator_secondary_next(struct iterator *base, struct tuple **ret)
1,417,610✔
3741
{
3742
        double start_time = ev_monotonic_now(loop());
1,417,610!
3743

3744
        assert(base->next == vinyl_iterator_secondary_next);
1,417,610!
3745
        struct vinyl_iterator *it = (struct vinyl_iterator *)base;
1,417,610✔
3746
        struct vy_lsm *lsm = it->iterator.lsm;
1,417,610✔
3747
        assert(lsm->index_id > 0);
1,417,610!
3748
        /*
3749
         * Make sure the LSM tree isn't deleted while we are
3750
         * reading from it.
3751
         */
3752
        vy_lsm_ref(lsm);
1,417,610!
3753

3754
        struct vy_entry partial, entry;
3755
next:
1,417,860✔
3756
        if (vinyl_iterator_check_tx(it) != 0)
1,417,860!
3757
                goto fail;
1✔
3758

3759
        if (vy_read_iterator_next(&it->iterator, &partial) != 0)
1,417,860!
3760
                goto fail;
×
3761

3762
        if (partial.stmt == NULL) {
1,417,860✔
3763
                /* EOF. Close the iterator immediately. */
3764
                vy_read_iterator_cache_add(&it->iterator, vy_entry_none());
160,775!
3765
                vinyl_iterator_account_read(it, start_time, NULL);
160,775!
3766
                vinyl_iterator_close(it);
160,775!
3767
                *ret = NULL;
160,775✔
3768
                goto out;
160,775✔
3769
        }
3770
        /* Get the full tuple from the primary index. */
3771
        if (vy_get_by_secondary_tuple(lsm, it->tx, vy_tx_read_view(it->tx),
1,257,090!
3772
                                      partial, &entry) != 0)
3773
                goto fail;
×
3774
        if (entry.stmt == NULL)
1,257,090✔
3775
                goto next;
259✔
3776
        vy_read_iterator_cache_add(&it->iterator, entry);
1,256,830!
3777
        vinyl_iterator_account_read(it, start_time, entry.stmt);
1,256,830!
3778
        vinyl_iterator_update_pos(it, entry);
1,256,830!
3779
        *ret = entry.stmt;
1,256,830✔
3780
        tuple_bless(*ret);
1,256,830!
3781
        tuple_unref(*ret);
1,256,830!
3782
out:
1,417,600✔
3783
        vy_lsm_unref(lsm);
1,417,600!
3784
        return 0;
1,417,610✔
3785
fail:
1✔
3786
        vinyl_iterator_close(it);
1!
3787
        vy_lsm_unref(lsm);
1!
3788
        return -1;
1✔
3789
}
3790

3791
/** Implementation of the iterator::position() method. */
3792
static int
3793
vinyl_iterator_position(struct iterator *base, const char **pos, uint32_t *size)
189✔
3794
{
3795
        struct vinyl_iterator *it = (struct vinyl_iterator *)base;
189✔
3796
        /* Sic: can't use it->lsm here, because the iterator may be closed. */
3797
        struct vy_lsm *lsm = vy_lsm(base->index);
189!
3798
        struct key_def *cmp_def = lsm->cmp_def;
189✔
3799
        struct vy_entry entry = it->pos;
189✔
3800
        if (entry.stmt == NULL) {
189✔
3801
                *pos = NULL;
45✔
3802
                *size = 0;
45✔
3803
                return 0;
189✔
3804
        }
3805
        const char *key = tuple_extract_key(
144!
3806
                        entry.stmt, cmp_def,
3807
                        vy_entry_multikey_idx(entry, cmp_def), size);
3808
        if (key == NULL)
144!
3809
                return -1;
×
3810
        *pos = key;
144✔
3811
        return 0;
144✔
3812
}
3813

3814
static void
3815
vinyl_iterator_free(struct iterator *base)
368,510✔
3816
{
3817
        assert(base->free == vinyl_iterator_free);
368,510!
3818
        struct vinyl_iterator *it = (struct vinyl_iterator *)base;
368,510✔
3819
        if (base->next != exhausted_iterator_next)
368,510✔
3820
                vinyl_iterator_close(it);
39,666✔
3821
        if (it->pos.stmt != NULL)
368,510✔
3822
                tuple_unref(it->pos.stmt);
228,383✔
3823
        mempool_free(it->pool, it);
368,510✔
3824
}
368,510✔
3825

3826
static struct iterator *
3827
vinyl_index_create_iterator(struct index *base, enum iterator_type type,
369,066✔
3828
                            const char *key, uint32_t part_count,
3829
                            const char *pos)
3830
{
3831
        struct vy_lsm *lsm = vy_lsm(base);
369,066!
3832
        struct vy_env *env = vy_env(base->engine);
369,066!
3833

3834
        if (type > ITER_GT) {
369,066✔
3835
                diag_set(UnsupportedIndexFeature, base->def,
1!
3836
                         "requested iterator type");
3837
                return NULL;
369,066✔
3838
        }
3839

3840
        struct vy_tx *tx = in_txn() ? in_txn()->engine_tx : NULL;
369,065!
3841
        if (tx != NULL && tx->state == VINYL_TX_ABORT) {
369,065✔
3842
                diag_set(ClientError, ER_TRANSACTION_CONFLICT);
292!
3843
                return NULL;
292✔
3844
        }
3845

3846
        ERROR_INJECT(ERRINJ_INDEX_ITERATOR_NEW, {
368,773!
3847
                diag_set(ClientError, ER_INJECTION, "iterator fail");
3848
                return NULL;
3849
        });
3850

3851
        struct vinyl_iterator *it = mempool_alloc(&env->iterator_pool);
368,768!
3852
        if (it == NULL) {
368,768!
3853
                diag_set(OutOfMemory, sizeof(struct vinyl_iterator),
×
3854
                         "mempool", "struct vinyl_iterator");
3855
                return NULL;
×
3856
        }
3857
        it->key = vy_entry_key_new(lsm->env->key_format, lsm->cmp_def,
368,768!
3858
                                   key, part_count);
3859
        if (it->key.stmt == NULL)
368,768✔
3860
                goto err_key;
1✔
3861
        struct vy_entry last;
3862
        if (pos != NULL) {
368,767✔
3863
                last = vy_entry_key_new(lsm->env->key_format, lsm->cmp_def,
263✔
3864
                                        pos, lsm->cmp_def->part_count);
263!
3865
                if (last.stmt == NULL)
263✔
3866
                        goto err_pos;
1✔
3867
        } else {
3868
                last = vy_entry_none();
368,504!
3869
        }
3870

3871
        iterator_create(&it->base, base);
368,766!
3872
        if (lsm->index_id == 0)
368,766✔
3873
                it->base.next = vinyl_iterator_primary_next;
197,735✔
3874
        else
3875
                it->base.next = vinyl_iterator_secondary_next;
171,031✔
3876
        it->base.position = vinyl_iterator_position;
368,766✔
3877
        it->base.free = vinyl_iterator_free;
368,766✔
3878
        it->pool = &env->iterator_pool;
368,766✔
3879
        it->pos = vy_entry_none();
368,766!
3880

3881
        if (tx != NULL) {
368,766✔
3882
                /*
3883
                 * Register a trigger that will abort this iterator
3884
                 * when the transaction ends.
3885
                 */
3886
                trigger_create(&it->on_tx_destroy,
50,925!
3887
                               vinyl_iterator_on_tx_destroy, NULL, NULL);
3888
                trigger_add(&tx->on_destroy, &it->on_tx_destroy);
50,925!
3889
        } else {
3890
                tx = &it->tx_autocommit;
317,841✔
3891
                vy_tx_create(env->xm, tx);
317,841!
3892
        }
3893
        it->tx = tx;
368,766✔
3894

3895
        lsm->stat.lookup++;
368,766✔
3896
        vy_read_iterator_open_after(
368,766✔
3897
                &it->iterator, lsm, tx, type, it->key, last,
3898
                (const struct vy_read_view **)&tx->read_view);
368,766!
3899
        return (struct iterator *)it;
368,766✔
3900
err_pos:
1✔
3901
        tuple_unref(it->key.stmt);
1!
3902
err_key:
2✔
3903
        mempool_free(&env->iterator_pool, it);
2!
3904
        return NULL;
2✔
3905
}
3906

3907
static int
3908
vinyl_index_get(struct index *index, const char *key,
55,399✔
3909
                uint32_t part_count, struct tuple **ret)
3910
{
3911
        assert(index->def->opts.is_unique);
55,399!
3912
        assert(index->def->key_def->part_count == part_count);
55,399!
3913

3914
        struct vy_lsm *lsm = vy_lsm(index);
55,399!
3915
        struct vy_env *env = vy_env(index->engine);
55,399!
3916
        struct vy_tx *tx = in_txn() ? in_txn()->engine_tx : NULL;
55,399!
3917
        if (tx != NULL && tx->state == VINYL_TX_ABORT) {
55,399✔
3918
                diag_set(ClientError, ER_TRANSACTION_CONFLICT);
7!
3919
                return -1;
55,398✔
3920
        }
3921
        struct vy_tx tx_autocommit;
3922
        if (tx == NULL) {
55,392✔
3923
                tx = &tx_autocommit;
44,430✔
3924
                vy_tx_create(env->xm, tx);
44,430!
3925
        }
3926
        int rc = vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx),
55,392!
3927
                                   key, part_count, ret);
3928
        if (tx == &tx_autocommit)
55,391✔
3929
                vy_tx_destroy(tx);
44,429!
3930
        if (rc != 0)
55,391!
3931
                return -1;
×
3932
        if (*ret != NULL) {
55,391✔
3933
                tuple_bless(*ret);
48,394!
3934
                tuple_unref(*ret);
48,394!
3935
        }
3936
        return 0;
55,391✔
3937
}
3938

3939
/*** }}} Cursor */
3940

3941
/* {{{ Index build */
3942

3943
/** Argument passed to vy_build_on_replace(). */
3944
struct vy_build_ctx {
3945
        /** LSM tree under construction. */
3946
        struct vy_lsm *lsm;
3947
        /** Format to check new tuples against. */
3948
        struct tuple_format *format;
3949
        /**
3950
         * Check the unique constraint of the new index
3951
         * if this flag is set.
3952
         */
3953
        bool check_unique_constraint;
3954
        /** Set in case a build error occurred. */
3955
        bool is_failed;
3956
        /** Container for storing errors. */
3957
        struct diag diag;
3958
};
3959

3960
/**
3961
 * This is an on_replace trigger callback that forwards DML requests
3962
 * to the index that is currently being built.
3963
 */
3964
static int
3965
vy_build_on_replace(struct trigger *trigger, void *event)
9,477✔
3966
{
3967
        struct txn *txn = event;
9,477✔
3968
        struct txn_stmt *stmt = txn_current_stmt(txn);
9,477✔
3969
        struct vy_build_ctx *ctx = trigger->data;
9,477✔
3970
        struct vy_tx *tx = txn->engine_tx;
9,477✔
3971
        struct tuple_format *format = ctx->format;
9,477✔
3972
        struct vy_lsm *lsm = ctx->lsm;
9,477✔
3973

3974
        if (ctx->is_failed)
9,477✔
3975
                return 0; /* already failed, nothing to do */
99✔
3976

3977
        /* Check new tuples for conformity to the new format. */
3978
        if (stmt->new_tuple != NULL &&
9,378✔
3979
            tuple_validate(format, stmt->new_tuple) != 0)
9,283✔
3980
                goto err;
3✔
3981

3982
        /* Check key uniqueness if necessary. */
3983
        if (ctx->check_unique_constraint && stmt->new_tuple != NULL &&
9,375✔
3984
            vy_check_is_unique_secondary(tx, vy_tx_read_view(tx),
223✔
3985
                                         lsm, stmt->new_tuple) != 0)
3986
                goto err;
1✔
3987

3988
        /* Forward the statement to the new LSM tree. */
3989
        if (stmt->old_tuple != NULL) {
9,374✔
3990
                struct tuple *delete = vy_stmt_new_surrogate_delete(format,
506✔
3991
                                                        stmt->old_tuple);
3992
                if (delete == NULL)
506!
3993
                        goto err;
×
3994
                int rc = vy_tx_set(tx, lsm, delete);
506✔
3995
                tuple_unref(delete);
506✔
3996
                if (rc != 0)
506!
3997
                        goto err;
×
3998
        }
3999
        if (stmt->new_tuple != NULL) {
9,374✔
4000
                uint32_t data_len;
4001
                const char *data = tuple_data_range(stmt->new_tuple, &data_len);
9,279!
4002
                struct tuple *insert = vy_stmt_new_insert(format, data,
9,279!
4003
                                                          data + data_len);
4004
                if (insert == NULL)
9,279!
4005
                        goto err;
×
4006
                int rc = vy_tx_set(tx, lsm, insert);
9,279!
4007
                tuple_unref(insert);
9,279!
4008
                if (rc != 0)
9,279!
4009
                        goto err;
×
4010
        }
4011
        return 0;
9,374✔
4012
err:
4✔
4013
        /*
4014
         * No need to abort the DDL request if this transaction
4015
         * has been aborted as its changes won't be applied to
4016
         * the space anyway. Besides, it might have been aborted
4017
         * by the DDL request itself, in which case the build
4018
         * context isn't valid and so we must not modify it.
4019
         */
4020
        if (tx->state == VINYL_TX_ABORT)
4✔
4021
                return 0;
1✔
4022
        ctx->is_failed = true;
3✔
4023
        diag_move(diag_get(), &ctx->diag);
3✔
4024
        return 0;
3✔
4025
}
4026

4027
/**
4028
 * Insert a single statement into the LSM tree that is currently
4029
 * being built.
4030
 */
4031
static int
4032
vy_build_insert_stmt(struct vy_lsm *lsm, struct vy_mem *mem,
148,884✔
4033
                     struct tuple *stmt, int64_t lsn)
4034
{
4035
        struct tuple *region_stmt = vy_stmt_dup_lsregion(stmt,
297,768✔
4036
                                &mem->env->allocator, mem->generation);
148,884!
4037
        if (region_stmt == NULL)
148,884!
4038
                return -1;
148,884✔
4039
        vy_stmt_set_lsn(region_stmt, lsn);
148,884!
4040
        struct vy_entry entry;
4041
        vy_stmt_foreach_entry(entry, region_stmt, lsm->cmp_def) {
297,786!
4042
                if (vy_mem_insert(mem, entry, &lsm->stat.memory.count) != 0)
148,896!
4043
                        return -1;
×
4044
                vy_mem_commit_stmt(mem, entry);
148,896!
4045
        }
4046
        return 0;
148,884✔
4047
}
4048

4049
/**
4050
 * Insert a tuple fetched from the space into the LSM tree that
4051
 * is currently being built.
4052
 */
4053
static int
4054
vy_build_insert_tuple(struct vy_env *env, struct vy_lsm *lsm,
148,884✔
4055
                      struct tuple_format *new_format,
4056
                      bool check_unique_constraint, struct tuple *tuple)
4057
{
4058
        int rc;
4059
        struct vy_mem *mem = lsm->mem;
148,884✔
4060
        int64_t lsn = vy_stmt_lsn(tuple);
148,884!
4061

4062
        /* Check the tuple against the new space format. */
4063
        if (tuple_validate(new_format, tuple) != 0)
148,884!
4064
                return -1;
148,884✔
4065

4066
        /* Reallocate the new tuple using the new space format. */
4067
        uint32_t data_len;
4068
        const char *data = tuple_data_range(tuple, &data_len);
148,866!
4069
        struct tuple *stmt = vy_stmt_new_replace(new_format, data,
148,866!
4070
                                                 data + data_len);
4071
        if (stmt == NULL)
148,866!
4072
                return -1;
×
4073

4074
        /*
4075
         * Check unique constraint if necessary.
4076
         *
4077
         * Note, this operation may yield, which opens a time
4078
         * window for a concurrent fiber to insert a newer tuple
4079
         * version. It's OK - we won't overwrite it, because the
4080
         * LSN we use is less. However, we do need to make sure
4081
         * we insert the tuple into the in-memory index that was
4082
         * active before the yield, otherwise we might break the
4083
         * invariant according to which newer in-memory indexes
4084
         * store statements with greater LSNs. So we pin the
4085
         * in-memory index that is active now and insert the tuple
4086
         * into it after the yield.
4087
         *
4088
         * Also note that this operation is semantically a REPLACE
4089
         * while the original tuple may have INSERT type. Since the
4090
         * uniqueness check helper is sensitive to the statement
4091
         * type, we must not use the original tuple for the check.
4092
         */
4093
        if (check_unique_constraint) {
148,866✔
4094
                vy_mem_pin(mem);
3,129!
4095
                rc = vy_check_is_unique_secondary(
6,258✔
4096
                        NULL, &env->xm->p_committed_read_view, lsm, stmt);
3,129!
4097
                vy_mem_unpin(mem);
3,129!
4098
                if (rc != 0) {
3,129✔
4099
                        tuple_unref(stmt);
11!
4100
                        return -1;
11✔
4101
                }
4102
        }
4103
        /*
4104
         * Despite the fact that we protected mem from being
4105
         * dumped, its generation still may bump due to rotation
4106
         * in vy_tx_write_prepare() (insertions during index build
4107
         * are still handled by on_replace_trigger). This may happen
4108
         * if dump process is triggered (vy_scheduler_trigger_dump()).
4109
         * Hence, to get right mem (during mem rotation it becomes
4110
         * sealed i.e. read-only) we should fetch it from lsm again.
4111
         */
4112
        if (vy_lsm_rotate_mem_if_required(lsm) != 0) {
148,855!
4113
                tuple_unref(stmt);
×
4114
                return -1;
×
4115
        }
4116
        mem = lsm->mem;
148,855✔
4117

4118
        /* Insert the new tuple into the in-memory index. */
4119
        size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
148,855!
4120
        rc = vy_build_insert_stmt(lsm, mem, stmt, lsn);
148,855!
4121
        tuple_unref(stmt);
148,855!
4122

4123
        /* Consume memory quota. Throttle if it is exceeded. */
4124
        size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
148,855!
4125
        assert(mem_used_after >= mem_used_before);
148,855!
4126
        vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_DDL,
148,855!
4127
                           mem_used_after - mem_used_before);
4128
        vy_regulator_check_dump_watermark(&env->regulator);
148,855!
4129
        vy_quota_wait(&env->quota, VY_QUOTA_CONSUMER_DDL);
148,855!
4130
        return rc;
148,855✔
4131
}
4132

4133
/**
4134
 * Recover a single statement that was inserted into the space
4135
 * while the newly built index was dumped to disk.
4136
 */
4137
static int
4138
vy_build_recover_stmt(struct vy_lsm *lsm, struct vy_lsm *pk,
18✔
4139
                      struct vy_entry mem_entry)
4140
{
4141
        struct tuple *mem_stmt = mem_entry.stmt;
18✔
4142
        int64_t lsn = vy_stmt_lsn(mem_stmt);
18!
4143
        if (lsn <= lsm->dump_lsn)
18!
4144
                return 0; /* statement was dumped, nothing to do */
18✔
4145

4146
        /* Lookup the tuple that was affected by this statement. */
4147
        const struct vy_read_view rv = { .vlsn = lsn - 1 };
18✔
4148
        const struct vy_read_view *p_rv = &rv;
18✔
4149
        struct vy_entry old;
4150
        if (vy_point_lookup(pk, NULL, &p_rv, mem_entry, &old) != 0)
18!
4151
                return -1;
×
4152
        /*
4153
         * Create DELETE + INSERT statements corresponding to
4154
         * the given statement in the secondary index.
4155
         */
4156
        struct tuple *delete = NULL;
18✔
4157
        struct tuple *insert = NULL;
18✔
4158
        struct tuple *old_tuple = old.stmt;
18✔
4159
        if (old_tuple != NULL) {
18✔
4160
                delete = vy_stmt_new_surrogate_delete(lsm->mem_format,
15!
4161
                                                      old_tuple);
4162
                if (delete == NULL) {
15!
4163
                        tuple_unref(old_tuple);
×
4164
                        return -1;
×
4165
                }
4166
        }
4167
        enum iproto_type type = vy_stmt_type(mem_stmt);
18!
4168
        if (type == IPROTO_REPLACE || type == IPROTO_INSERT) {
27!
4169
                uint32_t data_len;
4170
                const char *data = tuple_data_range(mem_stmt, &data_len);
9!
4171
                insert = vy_stmt_new_insert(lsm->mem_format,
9!
4172
                                            data, data + data_len);
4173
                if (insert == NULL)
9!
4174
                        goto err;
×
4175
        } else if (type == IPROTO_UPSERT) {
9✔
4176
                struct tuple *new_tuple = vy_apply_upsert(mem_stmt, old_tuple,
5!
4177
                                                          pk->cmp_def, true);
4178
                if (new_tuple == NULL)
5!
4179
                        goto err;
×
4180
                uint32_t data_len;
4181
                const char *data = tuple_data_range(new_tuple, &data_len);
5!
4182
                insert = vy_stmt_new_insert(lsm->mem_format,
5!
4183
                                            data, data + data_len);
4184
                tuple_unref(new_tuple);
5!
4185
                if (insert == NULL)
5!
4186
                        goto err;
×
4187
        }
4188

4189
        if (old_tuple != NULL)
18✔
4190
                tuple_unref(old_tuple);
15!
4191

4192
        /* Insert DELETE + INSERT into the LSM tree. */
4193
        if (delete != NULL) {
18✔
4194
                int rc = vy_build_insert_stmt(lsm, lsm->mem, delete, lsn);
15!
4195
                tuple_unref(delete);
15!
4196
                if (rc != 0)
15!
4197
                        return -1;
×
4198
        }
4199
        if (insert != NULL) {
18✔
4200
                int rc = vy_build_insert_stmt(lsm, lsm->mem, insert, lsn);
14!
4201
                tuple_unref(insert);
14!
4202
                if (rc != 0)
14!
4203
                        return -1;
×
4204
        }
4205
        return 0;
18✔
4206

4207
err:
×
4208
        if (old_tuple != NULL)
×
4209
                tuple_unref(old_tuple);
×
4210
        if (delete != NULL)
×
4211
                tuple_unref(delete);
×
4212
        return -1;
×
4213
}
4214

4215
/**
4216
 * Recover all statements stored in the given in-memory index
4217
 * that were inserted into the space while the newly built index
4218
 * was dumped to disk.
4219
 */
4220
static int
4221
vy_build_recover_mem(struct vy_lsm *lsm, struct vy_lsm *pk, struct vy_mem *mem)
80✔
4222
{
4223
        assert(lsm->dump_lsn >= 0);
80!
4224
        /*
4225
         * Recover statements starting from the oldest one.
4226
         * Key order doesn't matter so we simply iterate over
4227
         * the in-memory index in reverse order.
4228
         */
4229
        struct vy_mem_tree_iterator itr;
4230
        itr = vy_mem_tree_last(&mem->tree);
80!
4231
        while (!vy_mem_tree_iterator_is_invalid(&itr)) {
98!
4232
                struct vy_entry mem_entry;
4233
                mem_entry = *vy_mem_tree_iterator_get_elem(&mem->tree, &itr);
18!
4234
                if (vy_build_recover_stmt(lsm, pk, mem_entry) != 0)
18!
4235
                        return -1;
×
4236
                vy_mem_tree_iterator_prev(&mem->tree, &itr);
18!
4237
        }
4238
        return 0;
80✔
4239
}
4240

4241
/**
4242
 * Recover the memory level of a newly built index.
4243
 *
4244
 * During the final dump of a newly built index, new statements may
4245
 * be inserted into the space. If those statements are not dumped to
4246
 * disk before restart, they won't be recovered from WAL, because at
4247
 * the time they were generated the new index didn't exist. In order
4248
 * to recover them, we replay all statements stored in the memory
4249
 * level of the primary index.
4250
 */
4251
static int
4252
vy_build_recover(struct vy_env *env, struct vy_lsm *lsm, struct vy_lsm *pk)
137✔
4253
{
4254
        if (lsm->dump_lsn < 0) {
137✔
4255
                /*
4256
                 * The new index was never dumped, because the space's empty
4257
                 * so there's nothing to do.
4258
                 *
4259
                 * Note: the primary index may still have some cancelling
4260
                 * each other statements; we shouldn't try to apply them,
4261
                 * because they may be incompatible with the new index.
4262
                 */
4263
                return 0;
59✔
4264
        }
4265

4266
        int rc = 0;
78✔
4267
        struct vy_mem *mem;
4268
        size_t mem_used_before, mem_used_after;
4269

4270
        mem_used_before = lsregion_used(&env->mem_env.allocator);
78✔
4271
        rlist_foreach_entry_reverse(mem, &pk->sealed, in_sealed) {
160✔
4272
                rc = vy_build_recover_mem(lsm, pk, mem);
2✔
4273
                if (rc != 0)
2!
4274
                        break;
×
4275
        }
4276
        if (rc == 0)
78!
4277
                rc = vy_build_recover_mem(lsm, pk, pk->mem);
78✔
4278

4279
        mem_used_after = lsregion_used(&env->mem_env.allocator);
78✔
4280
        assert(mem_used_after >= mem_used_before);
78!
4281
        vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_DDL,
78✔
4282
                           mem_used_after - mem_used_before);
4283
        return rc;
78✔
4284
}
4285

4286
static int
4287
vinyl_space_build_index(struct space *src_space, struct index *new_index,
1,772✔
4288
                        struct tuple_format *new_format,
4289
                        bool check_unique_constraint)
4290
{
4291
        struct vy_env *env = vy_env(src_space->engine);
1,772!
4292
        struct vy_lsm *pk = vy_lsm(src_space->index[0]);
1,772!
4293
        struct txn *txn = in_txn();
1,772!
4294

4295
        struct errinj *inj = errinj(ERRINJ_BUILD_INDEX, ERRINJ_INT);
1,772!
4296
        if (inj != NULL && inj->iparam == (int)new_index->def->iid) {
1,772!
4297
                diag_set(ClientError, ER_INJECTION, "build index");
1!
4298
                return -1;
1,771✔
4299
        }
4300

4301
        if (vinyl_index_open(new_index) != 0)
1,771!
4302
                return -1;
×
4303

4304
        /* Set pointer to the primary key for the new index. */
4305
        struct vy_lsm *new_lsm = vy_lsm(new_index);
1,771!
4306
        vy_lsm_update_pk(new_lsm, pk);
1,771!
4307

4308
        if (env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
1,771✔
4309
            env->status == VINYL_FINAL_RECOVERY_LOCAL)
1,714✔
4310
                return vy_build_recover(env, new_lsm, pk);
137!
4311

4312
        /*
4313
         * Transactions started before the space alter request can't
4314
         * be checked with on_replace trigger so we abort them.
4315
         */
4316
        bool need_wal_sync;
4317
        vy_tx_manager_abort_writers_for_ddl(env->xm, src_space, &need_wal_sync);
1,634!
4318

4319
        if (!need_wal_sync && vy_lsm_is_empty(pk))
1,634!
4320
                return 0; /* space is empty, nothing to do */
1,259✔
4321

4322
        if (new_index->def->iid == 0) {
375✔
4323
                diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
7!
4324
                         "rebuilding the primary index of a non-empty space");
4325
                return -1;
7✔
4326
        }
4327

4328
        if (txn_check_singlestatement(txn, "index build") != 0)
368!
4329
                return -1;
2✔
4330

4331
        /*
4332
         * Iterate over all tuples stored in the space and insert
4333
         * each of them into the new LSM tree. Since read iterator
4334
         * may yield, we install an on_replace trigger to forward
4335
         * DML requests issued during the build.
4336
         */
4337
        struct trigger on_replace;
4338
        struct vy_build_ctx ctx;
4339
        ctx.lsm = new_lsm;
366✔
4340
        ctx.format = new_format;
366✔
4341
        ctx.check_unique_constraint = check_unique_constraint;
366✔
4342
        ctx.is_failed = false;
366✔
4343
        diag_create(&ctx.diag);
366!
4344
        trigger_create(&on_replace, vy_build_on_replace, &ctx, NULL);
366!
4345
        trigger_add(&src_space->on_replace, &on_replace);
366!
4346

4347
        /*
4348
         * Flush transactions waiting on WAL after installing on_replace
4349
         * trigger so that changes made by newer transactions are checked
4350
         * by the trigger callback.
4351
         */
4352
        int rc;
4353
        if (need_wal_sync) {
366✔
4354
                rc = wal_sync(NULL);
5!
4355
                if (rc != 0)
5✔
4356
                        goto out;
1✔
4357
        }
4358

4359
        struct vy_read_iterator itr;
4360
        vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, pk->env->empty_key,
365✔
4361
                              &env->xm->p_committed_read_view);
365!
4362
        int loops = 0;
365✔
4363
        struct vy_entry entry;
4364
        int64_t build_lsn = env->xm->lsn;
365✔
4365
        while ((rc = vy_read_iterator_next(&itr, &entry)) == 0) {
156,342!
4366
                struct tuple *tuple = entry.stmt;
156,341✔
4367
                if (tuple == NULL)
156,341✔
4368
                        break;
332✔
4369
                /*
4370
                 * Insert the tuple into the new index unless it
4371
                 * was inserted into the space after we started
4372
                 * building the new index - in the latter case
4373
                 * the new tuple has already been inserted by the
4374
                 * on_replace trigger.
4375
                 *
4376
                 * Note, yield is not allowed between reading the
4377
                 * tuple from the primary index and inserting it
4378
                 * into the new index. If we yielded, the tuple
4379
                 * could be overwritten by a concurrent transaction,
4380
                 * in which case we would insert an outdated tuple.
4381
                 */
4382
                if (vy_stmt_lsn(tuple) <= build_lsn) {
156,009!
4383
                        rc = vy_build_insert_tuple(env, new_lsm, new_format,
148,884!
4384
                                                   check_unique_constraint,
4385
                                                   tuple);
4386
                        if (rc != 0)
148,884✔
4387
                                break;
29✔
4388
                }
4389
                /*
4390
                 * Read iterator yields only when it reads runs.
4391
                 * Yield periodically in order not to stall the
4392
                 * tx thread in case there are a lot of tuples in
4393
                 * mems or cache.
4394
                 */
4395
                if (++loops % VY_YIELD_LOOPS == 0)
155,980✔
4396
                        fiber_sleep(0);
77,908✔
4397
                if (ctx.is_failed) {
155,980✔
4398
                        diag_move(&ctx.diag, diag_get());
3!
4399
                        rc = -1;
3✔
4400
                        break;
3✔
4401
                }
4402
                /*
4403
                 * Sleep after one tuple is inserted to test
4404
                 * on_replace triggers for index build.
4405
                 */
4406
                ERROR_INJECT_YIELD(ERRINJ_BUILD_INDEX_DELAY);
156,049!
4407
        }
4408
        vy_read_iterator_close(&itr);
364!
4409

4410
        /*
4411
         * Dump the new index upon build completion so that we don't
4412
         * have to rebuild it on recovery. No need to trigger dump if
4413
         * the space happens to be empty.
4414
         */
4415
        if (rc == 0 && !vy_lsm_is_empty(new_lsm))
364!
4416
                rc = vy_scheduler_dump(&env->scheduler);
324!
4417

4418
        if (rc == 0 && ctx.is_failed) {
364!
4419
                diag_move(&ctx.diag, diag_get());
×
4420
                rc = -1;
×
4421
        }
4422
out:
364✔
4423
        diag_destroy(&ctx.diag);
365!
4424
        trigger_clear(&on_replace);
365!
4425
        return rc;
365✔
4426
}
4427

4428
/* }}} Index build */
4429

4430
/* {{{ Deferred DELETE handling */
4431

4432
static int
4433
vy_deferred_delete_on_commit(struct trigger *trigger, void *event)
92✔
4434
{
4435
        (void)event;
4436
        struct txn *txn = in_txn();
92✔
4437
        struct vy_mem *mem = trigger->data;
92✔
4438
        /*
4439
         * Update dump_lsn so that we can skip dumped deferred
4440
         * DELETE statements on WAL recovery.
4441
         */
4442
        assert(mem->dump_lsn <= txn->signature);
92!
4443
        mem->dump_lsn = txn->signature;
92✔
4444
        /* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
4445
        vy_mem_unpin(mem);
92✔
4446
        return 0;
92✔
4447
}
4448

4449
static int
4450
vy_deferred_delete_on_rollback(struct trigger *trigger, void *event)
1✔
4451
{
4452
        (void)event;
4453
        struct vy_mem *mem = trigger->data;
1✔
4454
        /* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
4455
        vy_mem_unpin(mem);
1✔
4456
        return 0;
1✔
4457
}
4458

4459
/**
4460
 * Callback invoked when a deferred DELETE statement is written
4461
 * to _vinyl_deferred_delete system space. It extracts the
4462
 * deleted tuple, its LSN, and the target space id from the
4463
 * system space row, then generates a deferred DELETE statement
4464
 * and inserts it into secondary indexes of the target space.
4465
 *
4466
 * Note, this callback is also invoked during local WAL recovery
4467
 * to restore deferred DELETE statements that haven't been dumped
4468
 * to disk. To skip deferred DELETEs that have been dumped, we
4469
 * use the same technique we employ for normal WAL statements,
4470
 * i.e. we filter them by LSN, see vy_is_committed(). To do
4471
 * that, we need to account the LSN of a WAL row that generated
4472
 * a deferred DELETE statement to vy_lsm::dump_lsn, so we install
4473
 * an on_commit trigger that propagates the LSN of the WAL row to
4474
 * vy_mem::dump_lsn, which in will contribute to vy_lsm::dump_lsn
4475
 * when the in-memory tree is dumped, see vy_task_dump_new().
4476
 *
4477
 * This implies that we don't yield between statements of the
4478
 * same transaction, because if we did, two deferred DELETEs with
4479
 * the same WAL LSN could land in different in-memory trees: if
4480
 * one of the trees got dumped while the other didn't, we would
4481
 * mistakenly skip both statements on recovery.
4482
 */
4483
static int
4484
vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
1,650✔
4485
{
4486
        (void)trigger;
4487

4488
        struct txn *txn = event;
1,650✔
4489
        struct txn_stmt *stmt = txn_current_stmt(txn);
1,650!
4490
        bool is_first_statement = txn_is_first_statement(txn);
1,650!
4491

4492
        if (stmt->new_tuple == NULL)
1,650!
4493
                return 0;
1,650✔
4494
        /*
4495
         * Extract space id, LSN of the deferred DELETE statement,
4496
         * and the deleted tuple from the system space row.
4497
         */
4498
        struct tuple_iterator it;
4499
        tuple_rewind(&it, stmt->new_tuple);
1,650!
4500
        uint32_t space_id;
4501
        if (tuple_next_u32(&it, &space_id) != 0)
1,650!
4502
                return -1;
×
4503
        uint64_t lsn;
4504
        if (tuple_next_u64(&it, &lsn) != 0)
1,650!
4505
                return -1;
×
4506
        const char *delete_data = tuple_next_with_type(&it, MP_ARRAY);
1,650!
4507
        if (delete_data == NULL)
1,650!
4508
                return -1;
×
4509
        const char *delete_data_end = delete_data;
1,650✔
4510
        mp_next(&delete_data_end);
1,650!
4511

4512
        /* Look up the space. */
4513
        struct space *space = space_cache_find(space_id);
1,650!
4514
        if (space == NULL)
1,650!
4515
                return -1;
×
4516
        /*
4517
         * All secondary indexes could have been dropped, in
4518
         * which case we don't need to generate deferred DELETE
4519
         * statements anymore.
4520
         */
4521
        if (space->index_count <= 1)
1,650!
4522
                return 0;
×
4523

4524
        /* Create the deferred DELETE statement. */
4525
        struct vy_lsm *pk = vy_lsm(space->index[0]);
1,650!
4526
        struct tuple *delete = vy_stmt_new_delete(pk->mem_format, delete_data,
1,650!
4527
                                                  delete_data_end);
4528
        if (delete == NULL)
1,650!
4529
                return -1;
×
4530
        /*
4531
         * A deferred DELETE may be generated after new statements
4532
         * were committed for the deleted key. So we must use the
4533
         * original LSN (not the one of the WAL row) when inserting
4534
         * a deferred DELETE into an index to make sure that it will
4535
         * purge the appropriate tuple on compaction. However, this
4536
         * also breaks the read iterator invariant that states that
4537
         * newer sources contain newer statements for the same key.
4538
         * So we mark deferred DELETEs with the VY_STMT_SKIP_READ
4539
         * flag, which makes the read iterator ignore them.
4540
         */
4541
        vy_stmt_set_lsn(delete, lsn);
1,650!
4542
        vy_stmt_set_flags(delete, VY_STMT_SKIP_READ);
1,650!
4543

4544
        /* Insert the deferred DELETE into secondary indexes. */
4545
        int rc = 0;
1,650✔
4546
        struct vy_env *env = vy_env(space->engine);
1,650!
4547
        size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
1,650!
4548
        struct tuple *region_stmt = NULL;
1,650✔
4549
        for (uint32_t i = 1; i < space->index_count; i++) {
7,232✔
4550
                struct vy_lsm *lsm = vy_lsm(space->index[i]);
5,582!
4551
                if (vy_is_committed(env, lsm))
5,582!
4552
                        continue;
5,489✔
4553
                /*
4554
                 * As usual, rotate the active in-memory index if
4555
                 * schema was changed or dump was triggered. Do it
4556
                 * only if processing the first statement, because
4557
                 * dump may be triggered by one of the statements
4558
                 * of this transaction (see vy_quota_force_use()
4559
                 * below), in which case we must not do rotation
4560
                 * as we want all statements to land in the same
4561
                 * in-memory index. This is safe, as long as we
4562
                 * don't yield between statements.
4563
                 */
4564
                struct vy_mem *mem = lsm->mem;
5,386✔
4565
                if (is_first_statement) {
5,386✔
4566
                        rc = vy_lsm_rotate_mem_if_required(lsm);
93!
4567
                        if (rc != 0)
93!
4568
                                break;
×
4569
                        mem = lsm->mem;
93✔
4570
                }
4571
                struct vy_entry entry;
4572
                vy_stmt_foreach_entry(entry, delete, lsm->cmp_def) {
21,984!
4573
                        rc = vy_lsm_set(lsm, mem, entry, &region_stmt);
16,598!
4574
                        if (rc != 0)
16,598!
4575
                                break;
×
4576
                        entry.stmt = region_stmt;
16,598✔
4577
                        vy_lsm_commit_stmt(lsm, mem, entry);
16,598!
4578
                }
4579
                if (rc != 0)
5,386!
4580
                        break;
×
4581

4582
                if (!is_first_statement)
5,386✔
4583
                        continue;
5,293✔
4584
                /*
4585
                 * If this is the first statement of this
4586
                 * transaction, install on_commit trigger
4587
                 * which will propagate the WAL row LSN to
4588
                 * the LSM tree.
4589
                 */
4590
                size_t size;
4591
                struct trigger *on_commit =
93✔
4592
                        region_alloc_object(&txn->region, typeof(*on_commit),
93!
4593
                                            &size);
4594
                if (on_commit == NULL) {
93!
4595
                        diag_set(OutOfMemory, size, "region_alloc_object",
×
4596
                                 "on_commit");
4597
                        rc = -1;
×
4598
                        break;
×
4599
                }
4600
                struct trigger *on_rollback =
93✔
4601
                        region_alloc_object(&txn->region, typeof(*on_rollback),
93!
4602
                                            &size);
4603
                if (on_rollback == NULL) {
93!
4604
                        diag_set(OutOfMemory, size, "region_alloc_object",
×
4605
                                 "on_rollback");
4606
                        rc = -1;
×
4607
                        break;
×
4608
                }
4609
                vy_mem_pin(mem);
93!
4610
                trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL);
93!
4611
                trigger_create(on_rollback, vy_deferred_delete_on_rollback, mem, NULL);
93!
4612
                txn_on_commit(txn, on_commit);
93!
4613
                txn_on_rollback(txn, on_rollback);
93!
4614
        }
4615
        size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
1,650!
4616
        assert(mem_used_after >= mem_used_before);
1,650!
4617
        vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_COMPACTION,
1,650!
4618
                           mem_used_after - mem_used_before);
4619
        vy_regulator_check_dump_watermark(&env->regulator);
1,650!
4620

4621
        tuple_unref(delete);
1,650!
4622
        if (rc != 0)
1,650!
4623
                return -1;
×
4624
        return 0;
1,650✔
4625
}
4626

4627
static TRIGGER(on_replace_vinyl_deferred_delete, vy_deferred_delete_on_replace);
4628

4629
/* }}} Deferred DELETE handling */
4630

4631
static const struct engine_vtab vinyl_engine_vtab = {
4632
        /* .shutdown = */ vinyl_engine_shutdown,
4633
        /* .create_space = */ vinyl_engine_create_space,
4634
        /* .create_read_view = */ generic_engine_create_read_view,
4635
        /* .prepare_join = */ vinyl_engine_prepare_join,
4636
        /* .join = */ vinyl_engine_join,
4637
        /* .complete_join = */ vinyl_engine_complete_join,
4638
        /* .begin = */ vinyl_engine_begin,
4639
        /* .begin_statement = */ vinyl_engine_begin_statement,
4640
        /* .prepare = */ vinyl_engine_prepare,
4641
        /* .commit = */ vinyl_engine_commit,
4642
        /* .rollback_statement = */ vinyl_engine_rollback_statement,
4643
        /* .rollback = */ vinyl_engine_rollback,
4644
        /* .switch_to_ro = */ vinyl_engine_switch_to_ro,
4645
        /* .bootstrap = */ vinyl_engine_bootstrap,
4646
        /* .begin_initial_recovery = */ vinyl_engine_begin_initial_recovery,
4647
        /* .begin_final_recovery = */ vinyl_engine_begin_final_recovery,
4648
        /* .begin_hot_standby = */ vinyl_engine_begin_hot_standby,
4649
        /* .end_recovery = */ vinyl_engine_end_recovery,
4650
        /* .begin_checkpoint = */ vinyl_engine_begin_checkpoint,
4651
        /* .wait_checkpoint = */ vinyl_engine_wait_checkpoint,
4652
        /* .commit_checkpoint = */ vinyl_engine_commit_checkpoint,
4653
        /* .abort_checkpoint = */ vinyl_engine_abort_checkpoint,
4654
        /* .collect_garbage = */ vinyl_engine_collect_garbage,
4655
        /* .backup = */ vinyl_engine_backup,
4656
        /* .memory_stat = */ vinyl_engine_memory_stat,
4657
        /* .reset_stat = */ vinyl_engine_reset_stat,
4658
        /* .check_space_def = */ vinyl_engine_check_space_def,
4659
};
4660

4661
static const struct space_vtab vinyl_space_vtab = {
4662
        /* .destroy = */ vinyl_space_destroy,
4663
        /* .bsize = */ vinyl_space_bsize,
4664
        /* .execute_replace = */ vinyl_space_execute_replace,
4665
        /* .execute_delete = */ vinyl_space_execute_delete,
4666
        /* .execute_update = */ vinyl_space_execute_update,
4667
        /* .execute_upsert = */ vinyl_space_execute_upsert,
4668
        /* .ephemeral_replace = */ generic_space_ephemeral_replace,
4669
        /* .ephemeral_delete = */ generic_space_ephemeral_delete,
4670
        /* .ephemeral_rowid_next = */ generic_space_ephemeral_rowid_next,
4671
        /* .init_system_space = */ generic_init_system_space,
4672
        /* .init_ephemeral_space = */ generic_init_ephemeral_space,
4673
        /* .check_index_def = */ vinyl_space_check_index_def,
4674
        /* .create_index = */ vinyl_space_create_index,
4675
        /* .add_primary_key = */ vinyl_space_add_primary_key,
4676
        /* .drop_primary_key = */ generic_space_drop_primary_key,
4677
        /* .check_format = */ vinyl_space_check_format,
4678
        /* .build_index = */ vinyl_space_build_index,
4679
        /* .swap_index = */ vinyl_space_swap_index,
4680
        /* .prepare_alter = */ vinyl_space_prepare_alter,
4681
        /* .finish_alter = */ generic_space_finish_alter,
4682
        /* .prepare_upgrade = */ generic_space_prepare_upgrade,
4683
        /* .invalidate = */ vinyl_space_invalidate,
4684
};
4685

4686
static const struct index_vtab vinyl_index_vtab = {
4687
        /* .destroy = */ vinyl_index_destroy,
4688
        /* .commit_create = */ vinyl_index_commit_create,
4689
        /* .abort_create = */ vinyl_index_abort_create,
4690
        /* .commit_modify = */ vinyl_index_commit_modify,
4691
        /* .commit_drop = */ vinyl_index_commit_drop,
4692
        /* .update_def = */ vinyl_index_update_def,
4693
        /* .depends_on_pk = */ vinyl_index_depends_on_pk,
4694
        /* .def_change_requires_rebuild = */
4695
                vinyl_index_def_change_requires_rebuild,
4696
        /* .size = */ vinyl_index_size,
4697
        /* .bsize = */ vinyl_index_bsize,
4698
        /* .min = */ generic_index_min,
4699
        /* .max = */ generic_index_max,
4700
        /* .random = */ generic_index_random,
4701
        /* .count = */ generic_index_count,
4702
        /* .get_internal = */ generic_index_get_internal,
4703
        /* .get = */ vinyl_index_get,
4704
        /* .replace = */ generic_index_replace,
4705
        /* .create_iterator = */ vinyl_index_create_iterator,
4706
        /* .create_read_view = */ generic_index_create_read_view,
4707
        /* .stat = */ vinyl_index_stat,
4708
        /* .compact = */ vinyl_index_compact,
4709
        /* .reset_stat = */ vinyl_index_reset_stat,
4710
        /* .begin_build = */ generic_index_begin_build,
4711
        /* .reserve = */ generic_index_reserve,
4712
        /* .build_next = */ generic_index_build_next,
4713
        /* .end_build = */ generic_index_end_build,
4714
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc