• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / #10867

pending completion
#10867

push

travis-ci

vanyail
sql: change of PRAGMA INDEX_INFO syntax

20 of 20 new or added lines in 1 file covered. (100.0%)

60052 of 71300 relevant lines covered (84.22%)

1504784.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.33
/src/box/vy_index.c
1
/*
2
 * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "vy_index.h"
32

33
#include "trivia/util.h"
34
#include <stdbool.h>
35
#include <stddef.h>
36
#include <sys/stat.h>
37
#include <sys/types.h>
38

39
#include "assoc.h"
40
#include "diag.h"
41
#include "errcode.h"
42
#include "histogram.h"
43
#include "index_def.h"
44
#include "say.h"
45
#include "schema.h"
46
#include "tuple.h"
47
#include "vy_log.h"
48
#include "vy_mem.h"
49
#include "vy_range.h"
50
#include "vy_run.h"
51
#include "vy_stat.h"
52
#include "vy_stmt.h"
53
#include "vy_upsert.h"
54
#include "vy_read_set.h"
55

56
void
57
vy_index_validate_formats(const struct vy_index *index)
4,916✔
58
{
59
        (void) index;
60
        assert(index->disk_format != NULL);
4,916✔
61
        assert(index->mem_format != NULL);
4,916✔
62
        assert(index->mem_format_with_colmask != NULL);
4,916✔
63
        assert(index->upsert_format != NULL);
4,916✔
64
        uint32_t index_field_count = index->mem_format->index_field_count;
4,916✔
65
        (void) index_field_count;
66
        if (index->id == 0) {
4,916✔
67
                assert(index->disk_format == index->mem_format);
3,198✔
68
                assert(index->disk_format->index_field_count ==
3,198✔
69
                       index_field_count);
70
                assert(index->mem_format_with_colmask->index_field_count ==
3,198✔
71
                       index_field_count);
72
        } else {
73
                assert(index->disk_format != index->mem_format);
1,718✔
74
                assert(index->disk_format->index_field_count <=
1,718✔
75
                       index_field_count);
76
        }
77
        assert(index->upsert_format->index_field_count == index_field_count);
4,916✔
78
        assert(index->mem_format_with_colmask->index_field_count ==
4,916✔
79
               index_field_count);
80
}
4,916✔
81

82
int
83
vy_index_env_create(struct vy_index_env *env, const char *path,
826✔
84
                    int64_t *p_generation,
85
                    vy_upsert_thresh_cb upsert_thresh_cb,
86
                    void *upsert_thresh_arg)
87
{
88
        env->key_format = tuple_format_new(&vy_tuple_format_vtab,
826✔
89
                                           NULL, 0, 0, NULL, 0, NULL);
90
        if (env->key_format == NULL)
826✔
91
                return -1;
×
92
        tuple_format_ref(env->key_format);
826✔
93
        env->empty_key = vy_stmt_new_select(env->key_format, NULL, 0);
826✔
94
        if (env->empty_key == NULL) {
826✔
95
                tuple_format_unref(env->key_format);
×
96
                return -1;
×
97
        }
98
        env->path = path;
826✔
99
        env->p_generation = p_generation;
826✔
100
        env->upsert_thresh_cb = upsert_thresh_cb;
826✔
101
        env->upsert_thresh_arg = upsert_thresh_arg;
826✔
102
        env->too_long_threshold = TIMEOUT_INFINITY;
826✔
103
        env->index_count = 0;
826✔
104
        return 0;
826✔
105
}
106

107
void
108
vy_index_env_destroy(struct vy_index_env *env)
820✔
109
{
110
        tuple_unref(env->empty_key);
820✔
111
        tuple_format_unref(env->key_format);
820✔
112
}
820✔
113

114
const char *
115
vy_index_name(struct vy_index *index)
2,429✔
116
{
117
        char *buf = tt_static_buf();
2,429✔
118
        snprintf(buf, TT_STATIC_BUF_LEN, "%u/%u",
2,429✔
119
                 (unsigned)index->space_id, (unsigned)index->id);
2,429✔
120
        return buf;
2,429✔
121
}
122

123
size_t
124
vy_index_mem_tree_size(struct vy_index *index)
289✔
125
{
126
        struct vy_mem *mem;
127
        size_t size = index->mem->tree_extent_size;
289✔
128
        rlist_foreach_entry(mem, &index->sealed, in_sealed)
328✔
129
                size += mem->tree_extent_size;
39✔
130
        return size;
289✔
131
}
132

133
struct vy_index *
134
vy_index_new(struct vy_index_env *index_env, struct vy_cache_env *cache_env,
2,850✔
135
             struct vy_mem_env *mem_env, struct index_def *index_def,
136
             struct tuple_format *format, struct vy_index *pk)
137
{
138
        static int64_t run_buckets[] = {
139
                0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100,
140
        };
141

142
        assert(index_def->key_def->part_count > 0);
2,850✔
143
        assert(index_def->iid == 0 || pk != NULL);
2,850✔
144

145
        struct vy_index *index = calloc(1, sizeof(struct vy_index));
2,850✔
146
        if (index == NULL) {
2,850✔
147
                diag_set(OutOfMemory, sizeof(struct vy_index),
×
148
                         "calloc", "struct vy_index");
149
                goto fail;
×
150
        }
151
        index->env = index_env;
2,850✔
152

153
        index->tree = malloc(sizeof(*index->tree));
2,850✔
154
        if (index->tree == NULL) {
2,850✔
155
                diag_set(OutOfMemory, sizeof(*index->tree),
×
156
                         "malloc", "vy_range_tree_t");
157
                goto fail_tree;
×
158
        }
159

160
        struct key_def *key_def = key_def_dup(index_def->key_def);
2,850✔
161
        if (key_def == NULL)
2,850✔
162
                goto fail_key_def;
×
163

164
        struct key_def *cmp_def = key_def_dup(index_def->cmp_def);
2,850✔
165
        if (cmp_def == NULL)
2,850✔
166
                goto fail_cmp_def;
×
167

168
        index->cmp_def = cmp_def;
2,850✔
169
        index->key_def = key_def;
2,850✔
170
        if (index_def->iid == 0) {
2,850✔
171
                /*
172
                 * Disk tuples can be returned to an user from a
173
                 * primary key. And they must have field
174
                 * definitions as well as space->format tuples.
175
                 */
176
                index->disk_format = format;
1,873✔
177
                tuple_format_ref(format);
1,873✔
178
        } else {
179
                index->disk_format = tuple_format_new(&vy_tuple_format_vtab,
977✔
180
                                                      &cmp_def, 1, 0, NULL, 0,
181
                                                      NULL);
182
                if (index->disk_format == NULL)
977✔
183
                        goto fail_format;
×
184
                for (uint32_t i = 0; i < cmp_def->part_count; ++i) {
3,690✔
185
                        uint32_t fieldno = cmp_def->parts[i].fieldno;
2,713✔
186
                        index->disk_format->fields[fieldno].nullable_action =
5,426✔
187
                                format->fields[fieldno].nullable_action;
2,713✔
188
                }
189
        }
190
        tuple_format_ref(index->disk_format);
2,850✔
191

192
        if (index_def->iid == 0) {
2,850✔
193
                index->upsert_format =
1,873✔
194
                        vy_tuple_format_new_upsert(format);
1,873✔
195
                if (index->upsert_format == NULL)
1,873✔
196
                        goto fail_upsert_format;
×
197
                tuple_format_ref(index->upsert_format);
1,873✔
198

199
                index->mem_format_with_colmask =
1,873✔
200
                        vy_tuple_format_new_with_colmask(format);
1,873✔
201
                if (index->mem_format_with_colmask == NULL)
1,873✔
202
                        goto fail_mem_format_with_colmask;
×
203
                tuple_format_ref(index->mem_format_with_colmask);
1,873✔
204
        } else {
205
                index->mem_format_with_colmask = pk->mem_format_with_colmask;
977✔
206
                index->upsert_format = pk->upsert_format;
977✔
207
                tuple_format_ref(index->mem_format_with_colmask);
977✔
208
                tuple_format_ref(index->upsert_format);
977✔
209
        }
210

211
        if (vy_index_stat_create(&index->stat) != 0)
2,850✔
212
                goto fail_stat;
×
213

214
        index->run_hist = histogram_new(run_buckets, lengthof(run_buckets));
2,850✔
215
        if (index->run_hist == NULL)
2,850✔
216
                goto fail_run_hist;
×
217

218
        index->mem = vy_mem_new(mem_env, *index->env->p_generation,
2,850✔
219
                                cmp_def, format, index->mem_format_with_colmask,
220
                                index->upsert_format, schema_version);
221
        if (index->mem == NULL)
2,850✔
222
                goto fail_mem;
×
223

224
        index->refs = 1;
2,850✔
225
        index->commit_lsn = -1;
2,850✔
226
        index->dump_lsn = -1;
2,850✔
227
        vy_cache_create(&index->cache, cache_env, cmp_def);
2,850✔
228
        rlist_create(&index->sealed);
2,850✔
229
        vy_range_tree_new(index->tree);
2,850✔
230
        vy_range_heap_create(&index->range_heap);
2,850✔
231
        rlist_create(&index->runs);
2,850✔
232
        index->pk = pk;
2,850✔
233
        if (pk != NULL)
2,850✔
234
                vy_index_ref(pk);
977✔
235
        index->mem_format = format;
2,850✔
236
        tuple_format_ref(index->mem_format);
2,850✔
237
        index->in_dump.pos = UINT32_MAX;
2,850✔
238
        index->in_compact.pos = UINT32_MAX;
2,850✔
239
        index->space_id = index_def->space_id;
2,850✔
240
        index->id = index_def->iid;
2,850✔
241
        index->opts = index_def->opts;
2,850✔
242
        index->check_is_unique = index->opts.is_unique;
2,850✔
243
        vy_index_read_set_new(&index->read_set);
2,850✔
244

245
        index_env->index_count++;
2,850✔
246
        vy_index_validate_formats(index);
2,850✔
247
        return index;
5,700✔
248

249
fail_mem:
250
        histogram_delete(index->run_hist);
×
251
fail_run_hist:
252
        vy_index_stat_destroy(&index->stat);
×
253
fail_stat:
254
        tuple_format_unref(index->mem_format_with_colmask);
×
255
fail_mem_format_with_colmask:
256
        tuple_format_unref(index->upsert_format);
×
257
fail_upsert_format:
258
        tuple_format_unref(index->disk_format);
×
259
fail_format:
260
        free(cmp_def);
×
261
fail_cmp_def:
262
        free(key_def);
×
263
fail_key_def:
264
        free(index->tree);
×
265
fail_tree:
266
        free(index);
×
267
fail:
268
        return NULL;
×
269
}
270

271
static struct vy_range *
272
vy_range_tree_free_cb(vy_range_tree_t *t, struct vy_range *range, void *arg)
1,408✔
273
{
274
        (void)t;
275
        (void)arg;
276
        struct vy_slice *slice;
277
        rlist_foreach_entry(slice, &range->slices, in_range)
1,611✔
278
                vy_slice_wait_pinned(slice);
203✔
279
        vy_range_delete(range);
1,408✔
280
        return NULL;
1,408✔
281
}
282

283
void
284
vy_index_delete(struct vy_index *index)
2,650✔
285
{
286
        assert(index->refs == 0);
2,650✔
287
        assert(index->in_dump.pos == UINT32_MAX);
2,650✔
288
        assert(index->in_compact.pos == UINT32_MAX);
2,650✔
289
        assert(vy_index_read_set_empty(&index->read_set));
2,650✔
290
        assert(index->env->index_count > 0);
2,650✔
291

292
        index->env->index_count--;
2,650✔
293

294
        if (index->pk != NULL)
2,650✔
295
                vy_index_unref(index->pk);
948✔
296

297
        struct vy_mem *mem, *next_mem;
298
        rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem)
3,535✔
299
                vy_mem_delete(mem);
885✔
300
        vy_mem_delete(index->mem);
2,650✔
301

302
        struct vy_run *run, *next_run;
303
        rlist_foreach_entry_safe(run, &index->runs, in_index, next_run)
2,849✔
304
                vy_index_remove_run(index, run);
199✔
305

306
        vy_range_tree_iter(index->tree, NULL, vy_range_tree_free_cb, NULL);
2,650✔
307
        vy_range_heap_destroy(&index->range_heap);
2,650✔
308
        tuple_format_unref(index->disk_format);
2,650✔
309
        tuple_format_unref(index->mem_format_with_colmask);
2,650✔
310
        tuple_format_unref(index->upsert_format);
2,650✔
311
        if (index->id > 0)
2,650✔
312
                free(index->cmp_def);
948✔
313
        free(index->key_def);
2,650✔
314
        histogram_delete(index->run_hist);
2,650✔
315
        vy_index_stat_destroy(&index->stat);
2,650✔
316
        vy_cache_destroy(&index->cache);
2,650✔
317
        tuple_format_unref(index->mem_format);
2,650✔
318
        free(index->tree);
2,650✔
319
        TRASH(index);
2,650✔
320
        free(index);
2,650✔
321
}
2,650✔
322

323
void
324
vy_index_swap(struct vy_index *old_index, struct vy_index *new_index)
196✔
325
{
326
        assert(old_index->stat.memory.count.rows == 0);
196✔
327
        assert(new_index->stat.memory.count.rows == 0);
196✔
328

329
        SWAP(old_index->dump_lsn, new_index->dump_lsn);
196✔
330
        SWAP(old_index->range_count, new_index->range_count);
196✔
331
        SWAP(old_index->run_count, new_index->run_count);
196✔
332
        SWAP(old_index->stat, new_index->stat);
196✔
333
        SWAP(old_index->run_hist, new_index->run_hist);
196✔
334
        SWAP(old_index->tree, new_index->tree);
196✔
335
        SWAP(old_index->range_heap, new_index->range_heap);
196✔
336
        rlist_swap(&old_index->runs, &new_index->runs);
196✔
337
}
196✔
338

339
int
340
vy_index_init_range_tree(struct vy_index *index)
1,504✔
341
{
342
        struct vy_range *range = vy_range_new(vy_log_next_id(), NULL, NULL,
1,504✔
343
                                              index->cmp_def);
1,504✔
344
        if (range == NULL)
1,504✔
345
                return -1;
×
346

347
        assert(index->range_count == 0);
1,504✔
348
        vy_index_add_range(index, range);
1,504✔
349
        vy_index_acct_range(index, range);
1,504✔
350
        return 0;
1,504✔
351
}
352

353
int
354
vy_index_create(struct vy_index *index)
644✔
355
{
356
        /* Make index directory. */
357
        int rc;
358
        char path[PATH_MAX];
359
        vy_index_snprint_path(path, sizeof(path), index->env->path,
644✔
360
                              index->space_id, index->id);
361
        char *path_sep = path;
644✔
362
        while (*path_sep == '/') {
1,931✔
363
                /* Don't create root */
364
                ++path_sep;
643✔
365
        }
366
        while ((path_sep = strchr(path_sep, '/'))) {
5,143✔
367
                /* Recursively create path hierarchy */
368
                *path_sep = '\0';
3,855✔
369
                rc = mkdir(path, 0777);
3,855✔
370
                if (rc == -1 && errno != EEXIST) {
3,855✔
371
                        diag_set(SystemError, "failed to create directory '%s'",
×
372
                                 path);
373
                        *path_sep = '/';
×
374
                        return -1;
644✔
375
                }
376
                *path_sep = '/';
3,855✔
377
                ++path_sep;
3,855✔
378
        }
379
        rc = mkdir(path, 0777);
644✔
380
        if (rc == -1 && errno != EEXIST) {
644✔
381
                diag_set(SystemError, "failed to create directory '%s'",
×
382
                         path);
383
                return -1;
×
384
        }
385

386
        /* Allocate initial range. */
387
        return vy_index_init_range_tree(index);
644✔
388
}
389

390
/** vy_index_recovery_cb() argument. */
391
struct vy_index_recovery_cb_arg {
392
        /** Index being recovered. */
393
        struct vy_index *index;
394
        /** Last recovered range. */
395
        struct vy_range *range;
396
        /** Vinyl run environment. */
397
        struct vy_run_env *run_env;
398
        /**
399
         * All recovered runs hashed by ID.
400
         * It is needed in order not to load the same
401
         * run each time a slice is created for it.
402
         */
403
        struct mh_i64ptr_t *run_hash;
404
        /**
405
         * True if force_recovery mode is enabled.
406
         */
407
        bool force_recovery;
408
};
409

410
/** Index recovery callback, passed to vy_recovery_load_index(). */
411
static int
412
vy_index_recovery_cb(const struct vy_log_record *record, void *cb_arg)
1,190✔
413
{
414
        struct vy_index_recovery_cb_arg *arg = cb_arg;
1,190✔
415
        struct vy_index *index = arg->index;
1,190✔
416
        struct vy_range *range = arg->range;
1,190✔
417
        struct vy_run_env *run_env = arg->run_env;
1,190✔
418
        struct mh_i64ptr_t *run_hash = arg->run_hash;
1,190✔
419
        bool force_recovery = arg->force_recovery;
1,190✔
420
        struct tuple_format *key_format = index->env->key_format;
1,190✔
421
        struct tuple *begin = NULL, *end = NULL;
1,190✔
422
        struct vy_run *run;
423
        struct vy_slice *slice;
424
        bool success = false;
1,190✔
425

426
        assert(record->type == VY_LOG_CREATE_INDEX || index->commit_lsn >= 0);
1,190✔
427

428
        if (record->type == VY_LOG_INSERT_RANGE ||
2,280✔
429
            record->type == VY_LOG_INSERT_SLICE) {
1,090✔
430
                if (record->begin != NULL) {
168✔
431
                        begin = vy_key_from_msgpack(key_format, record->begin);
12✔
432
                        if (begin == NULL)
12✔
433
                                goto out;
×
434
                }
435
                if (record->end != NULL) {
168✔
436
                        end = vy_key_from_msgpack(key_format, record->end);
6✔
437
                        if (end == NULL)
6✔
438
                                goto out;
×
439
                }
440
        }
441

442
        switch (record->type) {
1,190✔
443
        case VY_LOG_CREATE_INDEX:
444
                assert(record->index_id == index->id);
377✔
445
                assert(record->space_id == index->space_id);
377✔
446
                assert(index->commit_lsn < 0);
377✔
447
                assert(record->index_lsn >= 0);
377✔
448
                index->commit_lsn = record->index_lsn;
377✔
449
                break;
1,567✔
450
        case VY_LOG_DUMP_INDEX:
451
                assert(record->index_lsn == index->commit_lsn);
70✔
452
                index->dump_lsn = record->dump_lsn;
70✔
453
                break;
70✔
454
        case VY_LOG_TRUNCATE_INDEX:
455
                assert(record->index_lsn == index->commit_lsn);
25✔
456
                index->truncate_count = record->truncate_count;
25✔
457
                break;
25✔
458
        case VY_LOG_DROP_INDEX:
459
                assert(record->index_lsn == index->commit_lsn);
282✔
460
                index->is_dropped = true;
282✔
461
                /*
462
                 * If the index was dropped, we don't need to replay
463
                 * truncate (see vy_prepare_truncate_space()).
464
                 */
465
                index->truncate_count = UINT64_MAX;
282✔
466
                break;
282✔
467
        case VY_LOG_PREPARE_RUN:
468
                break;
6✔
469
        case VY_LOG_CREATE_RUN:
470
                if (record->is_dropped)
164✔
471
                        break;
97✔
472
                assert(record->index_lsn == index->commit_lsn);
67✔
473
                run = vy_run_new(run_env, record->run_id);
67✔
474
                if (run == NULL)
67✔
475
                        goto out;
×
476
                run->dump_lsn = record->dump_lsn;
67✔
477
                if (vy_run_recover(run, index->env->path,
67✔
478
                                   index->space_id, index->id) != 0 &&
8✔
479
                     (!force_recovery ||
16✔
480
                     vy_run_rebuild_index(run, index->env->path,
16✔
481
                                          index->space_id, index->id,
482
                                          index->cmp_def, index->key_def,
8✔
483
                                          index->mem_format,
484
                                          index->upsert_format,
485
                                          &index->opts) != 0)) {
8✔
486
                        vy_run_unref(run);
×
487
                        goto out;
×
488
                }
489
                struct mh_i64ptr_node_t node = { run->id, run };
67✔
490
                if (mh_i64ptr_put(run_hash, &node,
134✔
491
                                  NULL, NULL) == mh_end(run_hash)) {
67✔
492
                        diag_set(OutOfMemory, 0,
×
493
                                 "mh_i64ptr_put", "mh_i64ptr_node_t");
494
                        vy_run_unref(run);
×
495
                        goto out;
×
496
                }
497
                break;
67✔
498
        case VY_LOG_DROP_RUN:
499
                break;
98✔
500
        case VY_LOG_INSERT_RANGE:
501
                assert(record->index_lsn == index->commit_lsn);
100✔
502
                range = vy_range_new(record->range_id, begin, end,
100✔
503
                                     index->cmp_def);
100✔
504
                if (range == NULL)
100✔
505
                        goto out;
×
506
                if (range->begin != NULL && range->end != NULL &&
102✔
507
                    vy_key_compare(range->begin, range->end,
2✔
508
                                   index->cmp_def) >= 0) {
2✔
509
                        diag_set(ClientError, ER_INVALID_VYLOG_FILE,
×
510
                                 tt_sprintf("begin >= end for range id %lld",
511
                                            (long long)range->id));
512
                        vy_range_delete(range);
×
513
                        goto out;
×
514
                }
515
                vy_index_add_range(index, range);
100✔
516
                arg->range = range;
100✔
517
                break;
100✔
518
        case VY_LOG_INSERT_SLICE:
519
                assert(range != NULL);
68✔
520
                assert(range->id == record->range_id);
68✔
521
                mh_int_t k = mh_i64ptr_find(run_hash, record->run_id, NULL);
68✔
522
                assert(k != mh_end(run_hash));
68✔
523
                run = mh_i64ptr_node(run_hash, k)->val;
68✔
524
                slice = vy_slice_new(record->slice_id, run, begin, end,
68✔
525
                                     index->cmp_def);
68✔
526
                if (slice == NULL)
68✔
527
                        goto out;
×
528
                vy_range_add_slice(range, slice);
68✔
529
                break;
68✔
530
        default:
531
                unreachable();
×
532
        }
533
        success = true;
1,190✔
534
out:
535
        if (begin != NULL)
1,190✔
536
                tuple_unref(begin);
12✔
537
        if (end != NULL)
1,190✔
538
                tuple_unref(end);
6✔
539
        return success ? 0 : -1;
1,190✔
540
}
541

542
int
543
vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
398✔
544
                 struct vy_run_env *run_env, int64_t lsn,
545
                 bool is_checkpoint_recovery, bool force_recovery)
546
{
547
        assert(index->range_count == 0);
398✔
548

549
        struct vy_index_recovery_cb_arg arg = {
398✔
550
                .index = index,
551
                .range = NULL,
552
                .run_env = run_env,
553
                .run_hash = NULL,
554
                .force_recovery = force_recovery,
555
        };
556
        arg.run_hash = mh_i64ptr_new();
398✔
557
        if (arg.run_hash == NULL) {
398✔
558
                diag_set(OutOfMemory, 0, "mh_i64ptr_new", "mh_i64ptr_t");
×
559
                return -1;
398✔
560
        }
561

562
        /*
563
         * Backward compatibility fixup: historically, we used
564
         * box.info.signature for LSN of index creation, which
565
         * lags behind the LSN of the record that created the
566
         * index by 1. So for legacy indexes use the LSN from
567
         * index options.
568
         */
569
        if (index->opts.lsn != 0)
398✔
570
                lsn = index->opts.lsn;
×
571

572
        int rc = vy_recovery_load_index(recovery, index->space_id, index->id,
398✔
573
                                        lsn, is_checkpoint_recovery,
574
                                        vy_index_recovery_cb, &arg);
575

576
        mh_int_t k;
577
        mh_foreach(arg.run_hash, k) {
465✔
578
                struct vy_run *run = mh_i64ptr_node(arg.run_hash, k)->val;
67✔
579
                if (run->refs > 1)
67✔
580
                        vy_index_add_run(index, run);
67✔
581
                if (run->refs == 1 && rc == 0) {
67✔
582
                        diag_set(ClientError, ER_INVALID_VYLOG_FILE,
×
583
                                 tt_sprintf("Unused run %lld in index %lld",
584
                                            (long long)run->id,
585
                                            (long long)index->commit_lsn));
586
                        rc = -1;
×
587
                        /*
588
                         * Continue the loop to unreference
589
                         * all runs in the hash.
590
                         */
591
                }
592
                /* Drop the reference held by the hash. */
593
                vy_run_unref(run);
67✔
594
        }
595
        mh_i64ptr_delete(arg.run_hash);
398✔
596

597
        if (rc != 0) {
398✔
598
                /* Recovery callback failed. */
599
                return -1;
×
600
        }
601

602
        if (index->commit_lsn < 0) {
398✔
603
                /* Index was not found in the metadata log. */
604
                if (is_checkpoint_recovery) {
21✔
605
                        /*
606
                         * All indexes created from snapshot rows must
607
                         * be present in vylog, because snapshot can
608
                         * only succeed if vylog has been successfully
609
                         * flushed.
610
                         */
611
                        diag_set(ClientError, ER_INVALID_VYLOG_FILE,
×
612
                                 tt_sprintf("Index %lld not found",
613
                                            (long long)index->commit_lsn));
614
                        return -1;
×
615
                }
616
                /*
617
                 * If we failed to log index creation before restart,
618
                 * we won't find it in the log on recovery. This is
619
                 * OK as the index doesn't have any runs in this case.
620
                 * We will retry to log index in vy_index_commit_create().
621
                 * For now, just create the initial range.
622
                 */
623
                return vy_index_init_range_tree(index);
21✔
624
        }
625

626
        if (index->is_dropped) {
377✔
627
                /*
628
                 * Initial range is not stored in the metadata log
629
                 * for dropped indexes, but we need it for recovery.
630
                 */
631
                return vy_index_init_range_tree(index);
282✔
632
        }
633

634
        /*
635
         * Account ranges to the index and check that the range tree
636
         * does not have holes or overlaps.
637
         */
638
        struct vy_range *range, *prev = NULL;
95✔
639
        for (range = vy_range_tree_first(index->tree); range != NULL;
290✔
640
             prev = range, range = vy_range_tree_next(index->tree, range)) {
100✔
641
                if (prev == NULL && range->begin != NULL) {
100✔
642
                        diag_set(ClientError, ER_INVALID_VYLOG_FILE,
×
643
                                 tt_sprintf("Range %lld is leftmost but "
644
                                            "starts with a finite key",
645
                                            (long long)range->id));
646
                        return -1;
×
647
                }
648
                int cmp = 0;
100✔
649
                if (prev != NULL &&
105✔
650
                    (prev->end == NULL || range->begin == NULL ||
15✔
651
                     (cmp = vy_key_compare(prev->end, range->begin,
5✔
652
                                           index->cmp_def)) != 0)) {
5✔
653
                        const char *errmsg = cmp > 0 ?
×
654
                                "Nearby ranges %lld and %lld overlap" :
×
655
                                "Keys between ranges %lld and %lld not spanned";
656
                        diag_set(ClientError, ER_INVALID_VYLOG_FILE,
×
657
                                 tt_sprintf(errmsg,
658
                                            (long long)prev->id,
659
                                            (long long)range->id));
660
                        return -1;
×
661
                }
662
                vy_index_acct_range(index, range);
100✔
663
        }
664
        if (prev == NULL) {
95✔
665
                diag_set(ClientError, ER_INVALID_VYLOG_FILE,
×
666
                         tt_sprintf("Index %lld has empty range tree",
667
                                    (long long)index->commit_lsn));
668
                return -1;
×
669
        }
670
        if (prev->end != NULL) {
95✔
671
                diag_set(ClientError, ER_INVALID_VYLOG_FILE,
×
672
                         tt_sprintf("Range %lld is rightmost but "
673
                                    "ends with a finite key",
674
                                    (long long)prev->id));
675
                return -1;
×
676
        }
677
        return 0;
95✔
678
}
679

680
int64_t
681
vy_index_generation(struct vy_index *index)
22,636✔
682
{
683
        struct vy_mem *oldest = rlist_empty(&index->sealed) ? index->mem :
25,325✔
684
                rlist_last_entry(&index->sealed, struct vy_mem, in_sealed);
2,689✔
685
        return oldest->generation;
22,636✔
686
}
687

688
int
689
vy_index_compact_priority(struct vy_index *index)
20,080✔
690
{
691
        struct heap_node *n = vy_range_heap_top(&index->range_heap);
20,080✔
692
        if (n == NULL)
20,080✔
693
                return 0;
1,025✔
694
        struct vy_range *range = container_of(n, struct vy_range, heap_node);
19,055✔
695
        return range->compact_priority;
19,055✔
696
}
697

698
void
699
vy_index_add_run(struct vy_index *index, struct vy_run *run)
1,210✔
700
{
701
        assert(rlist_empty(&run->in_index));
1,210✔
702
        rlist_add_entry(&index->runs, run, in_index);
1,210✔
703
        index->run_count++;
1,210✔
704
        vy_disk_stmt_counter_add(&index->stat.disk.count, &run->count);
1,210✔
705

706
        index->bloom_size += vy_run_bloom_size(run);
1,210✔
707
        index->page_index_size += run->page_index_size;
1,210✔
708

709
        index->env->bloom_size += vy_run_bloom_size(run);
1,210✔
710
        index->env->page_index_size += run->page_index_size;
1,210✔
711
}
1,210✔
712

713
void
714
vy_index_remove_run(struct vy_index *index, struct vy_run *run)
1,039✔
715
{
716
        assert(index->run_count > 0);
1,039✔
717
        assert(!rlist_empty(&run->in_index));
1,039✔
718
        rlist_del_entry(run, in_index);
1,039✔
719
        index->run_count--;
1,039✔
720
        vy_disk_stmt_counter_sub(&index->stat.disk.count, &run->count);
1,039✔
721

722
        index->bloom_size -= vy_run_bloom_size(run);
1,039✔
723
        index->page_index_size -= run->page_index_size;
1,039✔
724

725
        index->env->bloom_size -= vy_run_bloom_size(run);
1,039✔
726
        index->env->page_index_size -= run->page_index_size;
1,039✔
727
}
1,039✔
728

729
void
730
vy_index_add_range(struct vy_index *index, struct vy_range *range)
1,631✔
731
{
732
        assert(range->heap_node.pos == UINT32_MAX);
1,631✔
733
        vy_range_heap_insert(&index->range_heap, &range->heap_node);
1,631✔
734
        vy_range_tree_insert(index->tree, range);
1,631✔
735
        index->range_count++;
1,631✔
736
}
1,631✔
737

738
void
739
vy_index_remove_range(struct vy_index *index, struct vy_range *range)
17✔
740
{
741
        assert(range->heap_node.pos != UINT32_MAX);
17✔
742
        vy_range_heap_delete(&index->range_heap, &range->heap_node);
17✔
743
        vy_range_tree_remove(index->tree, range);
17✔
744
        index->range_count--;
17✔
745
}
17✔
746

747
void
748
vy_index_acct_range(struct vy_index *index, struct vy_range *range)
2,859✔
749
{
750
        histogram_collect(index->run_hist, range->slice_count);
2,859✔
751
}
2,859✔
752

753
void
754
vy_index_unacct_range(struct vy_index *index, struct vy_range *range)
1,245✔
755
{
756
        histogram_discard(index->run_hist, range->slice_count);
1,245✔
757
}
1,245✔
758

759
int
760
vy_index_rotate_mem(struct vy_index *index)
3,013✔
761
{
762
        struct vy_mem *mem;
763

764
        assert(index->mem != NULL);
3,013✔
765
        mem = vy_mem_new(index->mem->env, *index->env->p_generation,
6,026✔
766
                         index->cmp_def, index->mem_format,
3,013✔
767
                         index->mem_format_with_colmask,
768
                         index->upsert_format, schema_version);
769
        if (mem == NULL)
3,013✔
770
                return -1;
×
771

772
        rlist_add_entry(&index->sealed, index->mem, in_sealed);
3,013✔
773
        index->mem = mem;
3,013✔
774
        index->mem_list_version++;
3,013✔
775
        return 0;
3,013✔
776
}
777

778
void
779
vy_index_delete_mem(struct vy_index *index, struct vy_mem *mem)
2,060✔
780
{
781
        assert(!rlist_empty(&mem->in_sealed));
2,060✔
782
        rlist_del_entry(mem, in_sealed);
2,060✔
783
        vy_stmt_counter_sub(&index->stat.memory.count, &mem->count);
2,060✔
784
        vy_mem_delete(mem);
2,060✔
785
        index->mem_list_version++;
2,060✔
786
}
2,060✔
787

788
int
789
vy_index_set(struct vy_index *index, struct vy_mem *mem,
703,528✔
790
             const struct tuple *stmt, const struct tuple **region_stmt)
791
{
792
        assert(vy_stmt_is_refable(stmt));
703,528✔
793
        assert(*region_stmt == NULL || !vy_stmt_is_refable(*region_stmt));
703,528✔
794

795
        /* Allocate region_stmt on demand. */
796
        if (*region_stmt == NULL) {
703,528✔
797
                *region_stmt = vy_stmt_dup_lsregion(stmt, &mem->env->allocator,
619,599✔
798
                                                    mem->generation);
799
                if (*region_stmt == NULL)
619,599✔
800
                        return -1;
×
801
        }
802

803
        /* We can't free region_stmt below, so let's add it to the stats */
804
        index->stat.memory.count.bytes += tuple_size(stmt);
703,528✔
805

806
        uint32_t format_id = stmt->format_id;
703,528✔
807
        if (vy_stmt_type(*region_stmt) != IPROTO_UPSERT) {
703,528✔
808
                /* Abort transaction if format was changed by DDL */
809
                if (format_id != tuple_format_id(mem->format_with_colmask) &&
1,322,664✔
810
                    format_id != tuple_format_id(mem->format)) {
655,793✔
811
                        diag_set(ClientError, ER_TRANSACTION_CONFLICT);
2✔
812
                        return -1;
2✔
813
                }
814
                return vy_mem_insert(mem, *region_stmt);
666,869✔
815
        } else {
816
                /* Abort transaction if format was changed by DDL */
817
                if (format_id != tuple_format_id(mem->upsert_format)) {
36,657✔
818
                        diag_set(ClientError, ER_TRANSACTION_CONFLICT);
×
819
                        return -1;
×
820
                }
821
                return vy_mem_insert_upsert(mem, *region_stmt);
36,657✔
822
        }
823
}
824

825
/**
826
 * Calculate and record the number of sequential upserts, squash
827
 * immediately or schedule upsert process if needed.
828
 * Additional handler used in vy_index_commit_stmt() for UPSERT
829
 * statements.
830
 *
831
 * @param index Index the statement was committed to.
832
 * @param mem   In-memory tree where the statement was saved.
833
 * @param stmt  UPSERT statement to squash.
834
 */
835
static void
836
vy_index_commit_upsert(struct vy_index *index, struct vy_mem *mem,
36,657✔
837
                       const struct tuple *stmt)
838
{
839
        assert(vy_stmt_type(stmt) == IPROTO_UPSERT);
36,657✔
840
        assert(vy_stmt_lsn(stmt) < MAX_LSN);
36,657✔
841
        /*
842
         * UPSERT is enabled only for the spaces with the single
843
         * index.
844
         */
845
        assert(index->id == 0);
36,657✔
846

847
        const struct tuple *older;
848
        int64_t lsn = vy_stmt_lsn(stmt);
36,657✔
849
        uint8_t n_upserts = vy_stmt_n_upserts(stmt);
36,657✔
850
        /*
851
         * If there are a lot of successive upserts for the same key,
852
         * select might take too long to squash them all. So once the
853
         * number of upserts exceeds a certain threshold, we schedule
854
         * a fiber to merge them and insert the resulting statement
855
         * after the latest upsert.
856
         */
857
        if (n_upserts == VY_UPSERT_INF) {
36,657✔
858
                /*
859
                 * If UPSERT has n_upserts > VY_UPSERT_THRESHOLD,
860
                 * it means the mem has older UPSERTs for the same
861
                 * key which already are beeing processed in the
862
                 * squashing task. At the end, the squashing task
863
                 * will merge its result with this UPSERT
864
                 * automatically.
865
                 */
866
                return;
254✔
867
        }
868
        if (n_upserts == VY_UPSERT_THRESHOLD) {
36,403✔
869
                /*
870
                 * Start single squashing task per one-mem and
871
                 * one-key continous UPSERTs sequence.
872
                 */
873
#ifndef NDEBUG
874
                older = vy_mem_older_lsn(mem, stmt);
54✔
875
                assert(older != NULL && vy_stmt_type(older) == IPROTO_UPSERT &&
54✔
876
                       vy_stmt_n_upserts(older) == VY_UPSERT_THRESHOLD - 1);
877
#endif
878
                if (index->env->upsert_thresh_cb == NULL) {
54✔
879
                        /* Squash callback is not installed. */
880
                        return;
×
881
                }
882

883
                struct tuple *dup = vy_stmt_dup(stmt, index->upsert_format);
54✔
884
                if (dup != NULL) {
54✔
885
                        index->env->upsert_thresh_cb(index, dup,
108✔
886
                                        index->env->upsert_thresh_arg);
54✔
887
                        tuple_unref(dup);
54✔
888
                }
889
                /*
890
                 * Ignore dup == NULL, because the optimization is
891
                 * good, but is not necessary.
892
                 */
893
                return;
54✔
894
        }
895

896
        /*
897
         * If there are no other mems and runs and n_upserts == 0,
898
         * then we can turn the UPSERT into the REPLACE.
899
         */
900
        if (n_upserts == 0 &&
65,540✔
901
            index->stat.memory.count.rows == index->mem->count.rows &&
57,438✔
902
            index->run_count == 0) {
28,247✔
903
                older = vy_mem_older_lsn(mem, stmt);
16,952✔
904
                assert(older == NULL || vy_stmt_type(older) != IPROTO_UPSERT);
16,952✔
905
                struct tuple *upserted =
16,952✔
906
                        vy_apply_upsert(stmt, older, index->cmp_def,
16,952✔
907
                                        index->mem_format,
908
                                        index->upsert_format, false);
909
                index->stat.upsert.applied++;
16,952✔
910

911
                if (upserted == NULL) {
16,952✔
912
                        /* OOM */
913
                        diag_clear(diag_get());
×
914
                        return;
1✔
915
                }
916
                int64_t upserted_lsn = vy_stmt_lsn(upserted);
16,952✔
917
                if (upserted_lsn != lsn) {
16,952✔
918
                        /**
919
                         * This could only happen if the upsert completely
920
                         * failed and the old tuple was returned.
921
                         * In this case we shouldn't insert the same replace
922
                         * again.
923
                         */
924
                        assert(older == NULL ||
1✔
925
                               upserted_lsn == vy_stmt_lsn(older));
926
                        tuple_unref(upserted);
1✔
927
                        return;
1✔
928
                }
929
                assert(older == NULL || upserted_lsn != vy_stmt_lsn(older));
16,951✔
930
                assert(vy_stmt_type(upserted) == IPROTO_REPLACE);
16,951✔
931

932
                const struct tuple *region_stmt =
16,951✔
933
                        vy_stmt_dup_lsregion(upserted, &mem->env->allocator,
16,951✔
934
                                             mem->generation);
935
                if (region_stmt == NULL) {
16,951✔
936
                        /* OOM */
937
                        tuple_unref(upserted);
×
938
                        diag_clear(diag_get());
×
939
                        return;
×
940
                }
941

942
                int rc = vy_index_set(index, mem, upserted, &region_stmt);
16,951✔
943
                /**
944
                 * Since we have already allocated mem statement and
945
                 * now we replacing one statement with another, the
946
                 * vy_index_set() cannot fail.
947
                 */
948
                assert(rc == 0); (void)rc;
16,951✔
949
                tuple_unref(upserted);
16,951✔
950
                vy_mem_commit_stmt(mem, region_stmt);
16,951✔
951
                index->stat.upsert.squashed++;
16,951✔
952
        }
953
}
954

955
void
956
vy_index_commit_stmt(struct vy_index *index, struct vy_mem *mem,
686,384✔
957
                     const struct tuple *stmt)
958
{
959
        vy_mem_commit_stmt(mem, stmt);
686,384✔
960

961
        index->stat.memory.count.rows++;
686,384✔
962

963
        if (vy_stmt_type(stmt) == IPROTO_UPSERT)
686,384✔
964
                vy_index_commit_upsert(index, mem, stmt);
36,657✔
965

966
        vy_stmt_counter_acct_tuple(&index->stat.put, stmt);
686,384✔
967

968
        /* Invalidate cache element. */
969
        vy_cache_on_write(&index->cache, stmt, NULL);
686,384✔
970
}
686,384✔
971

972
void
973
vy_index_rollback_stmt(struct vy_index *index, struct vy_mem *mem,
138✔
974
                       const struct tuple *stmt)
975
{
976
        vy_mem_rollback_stmt(mem, stmt);
138✔
977

978
        /* Invalidate cache element. */
979
        vy_cache_on_write(&index->cache, stmt, NULL);
138✔
980
}
138✔
981

982
bool
983
vy_index_split_range(struct vy_index *index, struct vy_range *range)
355✔
984
{
985
        struct tuple_format *key_format = index->env->key_format;
355✔
986

987
        const char *split_key_raw;
988
        if (!vy_range_needs_split(range, &index->opts, &split_key_raw))
355✔
989
                return false;
697✔
990

991
        /* Split a range in two parts. */
992
        const int n_parts = 2;
13✔
993

994
        /*
995
         * Determine new ranges' boundaries.
996
         */
997
        struct tuple *split_key = vy_key_from_msgpack(key_format,
13✔
998
                                                      split_key_raw);
999
        if (split_key == NULL)
13✔
1000
                goto fail;
×
1001

1002
        struct tuple *keys[3];
1003
        keys[0] = range->begin;
13✔
1004
        keys[1] = split_key;
13✔
1005
        keys[2] = range->end;
13✔
1006

1007
        /*
1008
         * Allocate new ranges and create slices of
1009
         * the old range's runs for them.
1010
         */
1011
        struct vy_slice *slice, *new_slice;
1012
        struct vy_range *part, *parts[2] = {NULL, };
13✔
1013
        for (int i = 0; i < n_parts; i++) {
39✔
1014
                part = vy_range_new(vy_log_next_id(), keys[i], keys[i + 1],
26✔
1015
                                    index->cmp_def);
26✔
1016
                if (part == NULL)
26✔
1017
                        goto fail;
×
1018
                parts[i] = part;
26✔
1019
                /*
1020
                 * vy_range_add_slice() adds a slice to the list head,
1021
                 * so to preserve the order of the slices list, we have
1022
                 * to iterate backward.
1023
                 */
1024
                rlist_foreach_entry_reverse(slice, &range->slices, in_range) {
104✔
1025
                        if (vy_slice_cut(slice, vy_log_next_id(), part->begin,
78✔
1026
                                         part->end, index->cmp_def,
78✔
1027
                                         &new_slice) != 0)
1028
                                goto fail;
×
1029
                        if (new_slice != NULL)
78✔
1030
                                vy_range_add_slice(part, new_slice);
78✔
1031
                }
1032
                part->compact_priority = range->compact_priority;
26✔
1033
        }
1034

1035
        /*
1036
         * Log change in metadata.
1037
         */
1038
        vy_log_tx_begin();
13✔
1039
        rlist_foreach_entry(slice, &range->slices, in_range)
52✔
1040
                vy_log_delete_slice(slice->id);
39✔
1041
        vy_log_delete_range(range->id);
13✔
1042
        for (int i = 0; i < n_parts; i++) {
39✔
1043
                part = parts[i];
26✔
1044
                vy_log_insert_range(index->commit_lsn, part->id,
52✔
1045
                                    tuple_data_or_null(part->begin),
26✔
1046
                                    tuple_data_or_null(part->end));
26✔
1047
                rlist_foreach_entry(slice, &part->slices, in_range)
104✔
1048
                        vy_log_insert_slice(part->id, slice->run->id, slice->id,
156✔
1049
                                            tuple_data_or_null(slice->begin),
78✔
1050
                                            tuple_data_or_null(slice->end));
78✔
1051
        }
1052
        if (vy_log_tx_commit() < 0)
13✔
1053
                goto fail;
×
1054

1055
        /*
1056
         * Replace the old range in the index.
1057
         */
1058
        vy_index_unacct_range(index, range);
13✔
1059
        vy_index_remove_range(index, range);
13✔
1060

1061
        for (int i = 0; i < n_parts; i++) {
39✔
1062
                part = parts[i];
26✔
1063
                vy_index_add_range(index, part);
26✔
1064
                vy_index_acct_range(index, part);
26✔
1065
        }
1066
        index->range_tree_version++;
13✔
1067

1068
        say_info("%s: split range %s by key %s", vy_index_name(index),
13✔
1069
                 vy_range_str(range), tuple_str(split_key));
1070

1071
        rlist_foreach_entry(slice, &range->slices, in_range)
52✔
1072
                vy_slice_wait_pinned(slice);
39✔
1073
        vy_range_delete(range);
13✔
1074
        tuple_unref(split_key);
13✔
1075
        return true;
13✔
1076
fail:
1077
        for (int i = 0; i < n_parts; i++) {
×
1078
                if (parts[i] != NULL)
×
1079
                        vy_range_delete(parts[i]);
×
1080
        }
1081
        if (split_key != NULL)
×
1082
                tuple_unref(split_key);
×
1083

1084
        diag_log();
×
1085
        say_error("%s: failed to split range %s",
×
1086
                  vy_index_name(index), vy_range_str(range));
1087
        return false;
×
1088
}
1089

1090
bool
1091
vy_index_coalesce_range(struct vy_index *index, struct vy_range *range)
342✔
1092
{
1093
        struct vy_range *first, *last;
1094
        if (!vy_range_needs_coalesce(range, index->tree, &index->opts,
342✔
1095
                                     &first, &last))
1096
                return false;
683✔
1097

1098
        struct vy_range *result = vy_range_new(vy_log_next_id(),
2✔
1099
                        first->begin, last->end, index->cmp_def);
2✔
1100
        if (result == NULL)
1✔
1101
                goto fail_range;
×
1102

1103
        struct vy_range *it;
1104
        struct vy_range *end = vy_range_tree_next(index->tree, last);
1✔
1105

1106
        /*
1107
         * Log change in metadata.
1108
         */
1109
        vy_log_tx_begin();
1✔
1110
        vy_log_insert_range(index->commit_lsn, result->id,
2✔
1111
                            tuple_data_or_null(result->begin),
1✔
1112
                            tuple_data_or_null(result->end));
1✔
1113
        for (it = first; it != end; it = vy_range_tree_next(index->tree, it)) {
5✔
1114
                struct vy_slice *slice;
1115
                rlist_foreach_entry(slice, &it->slices, in_range)
12✔
1116
                        vy_log_delete_slice(slice->id);
8✔
1117
                vy_log_delete_range(it->id);
4✔
1118
                rlist_foreach_entry(slice, &it->slices, in_range) {
12✔
1119
                        vy_log_insert_slice(result->id, slice->run->id, slice->id,
16✔
1120
                                            tuple_data_or_null(slice->begin),
8✔
1121
                                            tuple_data_or_null(slice->end));
8✔
1122
                }
1123
        }
1124
        if (vy_log_tx_commit() < 0)
1✔
1125
                goto fail_commit;
×
1126

1127
        /*
1128
         * Move run slices of the coalesced ranges to the
1129
         * resulting range and delete the former.
1130
         */
1131
        it = first;
1✔
1132
        while (it != end) {
6✔
1133
                struct vy_range *next = vy_range_tree_next(index->tree, it);
4✔
1134
                vy_index_unacct_range(index, it);
4✔
1135
                vy_index_remove_range(index, it);
4✔
1136
                rlist_splice(&result->slices, &it->slices);
4✔
1137
                result->slice_count += it->slice_count;
4✔
1138
                vy_disk_stmt_counter_add(&result->count, &it->count);
4✔
1139
                vy_range_delete(it);
4✔
1140
                it = next;
4✔
1141
        }
1142
        /*
1143
         * Coalescing increases read amplification and breaks the log
1144
         * structured layout of the run list, so, although we could
1145
         * leave the resulting range as it is, we'd better compact it
1146
         * as soon as we can.
1147
         */
1148
        result->compact_priority = result->slice_count;
1✔
1149
        vy_index_acct_range(index, result);
1✔
1150
        vy_index_add_range(index, result);
1✔
1151
        index->range_tree_version++;
1✔
1152

1153
        say_info("%s: coalesced ranges %s",
1✔
1154
                 vy_index_name(index), vy_range_str(result));
1155
        return true;
1✔
1156

1157
fail_commit:
1158
        vy_range_delete(result);
×
1159
fail_range:
1160
        diag_log();
×
1161
        say_error("%s: failed to coalesce range %s",
×
1162
                  vy_index_name(index), vy_range_str(range));
1163
        return false;
×
1164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc