• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 9253825433

27 May 2024 11:10AM UTC coverage: 85.789% (-0.006%) from 85.795%
9253825433

Pull #10020

github

web-flow
Merge 051c335d9 into 69f2ddfcc
Pull Request #10020: [backport 2.11] cmake: bump OpenSSL version 1.1.1 -> 3.2.1

62596 of 113824 branches covered (54.99%)

93468 of 108951 relevant lines covered (85.79%)

2595923.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.12
/src/box/vy_run.c
1
/*
2
 * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "vy_run.h"
32

33
#include <zstd.h>
34

35
#include "fiber.h"
36
#include "fiber_cond.h"
37
#include "fio.h"
38
#include "cbus.h"
39
#include "memory.h"
40
#include "coio_file.h"
41

42
#include "replication.h"
43
#include "tuple_bloom.h"
44
#include "xlog.h"
45
#include "xrow.h"
46
#include "vy_history.h"
47

48
static const uint64_t vy_page_info_key_map = (1 << VY_PAGE_INFO_OFFSET) |
49
                                             (1 << VY_PAGE_INFO_SIZE) |
50
                                             (1 << VY_PAGE_INFO_UNPACKED_SIZE) |
51
                                             (1 << VY_PAGE_INFO_ROW_COUNT) |
52
                                             (1 << VY_PAGE_INFO_MIN_KEY) |
53
                                             (1 << VY_PAGE_INFO_ROW_INDEX_OFFSET);
54

55
static const uint64_t vy_run_info_key_map = (1 << VY_RUN_INFO_MIN_KEY) |
56
                                            (1 << VY_RUN_INFO_MAX_KEY) |
57
                                            (1 << VY_RUN_INFO_MIN_LSN) |
58
                                            (1 << VY_RUN_INFO_MAX_LSN) |
59
                                            (1 << VY_RUN_INFO_PAGE_COUNT);
60

61
/** xlog meta type for .run files */
62
#define XLOG_META_TYPE_RUN "RUN"
63

64
/** xlog meta type for .index files */
65
#define XLOG_META_TYPE_INDEX "INDEX"
66

67
const char *vy_file_suffix[] = {
68
        "index",                        /* VY_FILE_INDEX */
69
        "index" inprogress_suffix,         /* VY_FILE_INDEX_INPROGRESS */
70
        "run",                                /* VY_FILE_RUN */
71
        "run" inprogress_suffix,         /* VY_FILE_RUN_INPROGRESS */
72
};
73

74
/* sync run and index files very 16 MB */
75
#define VY_RUN_SYNC_INTERVAL (1 << 24)
76

77
/**
78
 * We read runs in background threads so as not to stall tx.
79
 * This structure represents such a thread.
80
 */
81
struct vy_run_reader {
82
        /** Thread that processes read requests. */
83
        struct cord cord;
84
        /** Pipe from tx to the reader thread. */
85
        struct cpipe reader_pipe;
86
        /** Pipe from the reader thread to tx. */
87
        struct cpipe tx_pipe;
88
};
89

90
/** Cbus task for vinyl page read. */
91
struct vy_page_read_task {
92
        /** parent */
93
        struct cbus_call_msg base;
94
        /** vinyl page metadata */
95
        struct vy_page_info *page_info;
96
        /** vy_run with fd - ref. counted */
97
        struct vy_run *run;
98
        /** key to lookup within the page */
99
        struct vy_entry key;
100
        /** iterator type (needed for for key lookup) */
101
        enum iterator_type iterator_type;
102
        /** key definition (needed for key lookup) */
103
        struct key_def *cmp_def;
104
        /** disk format (needed for key lookup) */
105
        struct tuple_format *format;
106
        /** [out] position of the key in the page */
107
        uint32_t pos_in_page;
108
        /** [out] true if key was found in the page */
109
        bool equal_found;
110
        /** [out] resulting vinyl page */
111
        struct vy_page *page;
112
};
113

114
/** Destructor for env->zdctx_key thread-local variable */
115
static void
116
vy_free_zdctx(void *arg)
376✔
117
{
118
        assert(arg != NULL);
376!
119
        ZSTD_freeDStream(arg);
376✔
120
}
376✔
121

122
/** Run reader thread function. */
123
static int
124
vy_run_reader_f(va_list ap)
939✔
125
{
126
        struct vy_run_reader *reader = va_arg(ap, struct vy_run_reader *);
939✔
127
        struct cbus_endpoint endpoint;
128

129
        cpipe_create(&reader->tx_pipe, "tx_prio");
939!
130
        cbus_endpoint_create(&endpoint, cord_name(cord()),
939!
131
                             fiber_schedule_cb, fiber());
939!
132
        cbus_loop(&endpoint);
939!
133
        cbus_endpoint_destroy(&endpoint, cbus_process);
×
134
        cpipe_destroy(&reader->tx_pipe);
×
135
        return 0;
×
136
}
137

138
/** Start run reader threads. */
139
static void
140
vy_run_env_start_readers(struct vy_run_env *env)
797✔
141
{
142
        assert(env->reader_pool == NULL);
797!
143
        assert(env->reader_pool_size > 0);
797!
144

145
        env->reader_pool = calloc(env->reader_pool_size,
797✔
146
                                  sizeof(*env->reader_pool));
147
        if (env->reader_pool == NULL)
797!
148
                panic("failed to allocate vinyl reader thread pool");
×
149

150
        for (int i = 0; i < env->reader_pool_size; i++) {
1,736✔
151
                struct vy_run_reader *reader = &env->reader_pool[i];
939✔
152
                char name[FIBER_NAME_MAX];
153

154
                snprintf(name, sizeof(name), "vinyl.reader.%d", i);
939✔
155
                if (cord_costart(&reader->cord, name,
939!
156
                                 vy_run_reader_f, reader) != 0)
157
                        panic("failed to start vinyl reader thread");
×
158
                cpipe_create(&reader->reader_pipe, name);
939!
159
        }
160
        env->next_reader = 0;
797✔
161
}
797✔
162

163
/** Join run reader threads. */
164
static void
165
vy_run_env_stop_readers(struct vy_run_env *env)
797✔
166
{
167
        for (int i = 0; i < env->reader_pool_size; i++) {
1,736✔
168
                struct vy_run_reader *reader = &env->reader_pool[i];
939✔
169
                cord_cancel_and_join(&reader->cord);
939✔
170
        }
171
        free(env->reader_pool);
797✔
172
}
797✔
173

174
/**
175
 * Initialize vinyl run environment
176
 */
177
void
178
vy_run_env_create(struct vy_run_env *env, int read_threads)
3,261✔
179
{
180
        memset(env, 0, sizeof(*env));
3,261✔
181
        env->reader_pool_size = read_threads;
3,261✔
182
        tt_pthread_key_create(&env->zdctx_key, vy_free_zdctx);
3,261!
183
        mempool_create(&env->read_task_pool, cord_slab_cache(),
3,261✔
184
                       sizeof(struct vy_page_read_task));
185
        env->initial_join = false;
3,261✔
186
}
3,261✔
187

188
/**
189
 * Destroy vinyl run environment
190
 */
191
void
192
vy_run_env_destroy(struct vy_run_env *env)
3,220✔
193
{
194
        if (env->reader_pool != NULL)
3,220✔
195
                vy_run_env_stop_readers(env);
797✔
196
        mempool_destroy(&env->read_task_pool);
3,220✔
197
        tt_pthread_key_delete(env->zdctx_key);
3,220!
198
}
3,220✔
199

200
/**
201
 * Enable coio reads for a vinyl run environment.
202
 */
203
void
204
vy_run_env_enable_coio(struct vy_run_env *env)
8,477✔
205
{
206
        if (env->reader_pool != NULL)
8,477✔
207
                return; /* already enabled */
7,680✔
208
        vy_run_env_start_readers(env);
797✔
209
}
210

211
/**
212
 * Execute a task on behalf of a reader thread.
213
 */
214
static int
215
vy_run_env_coio_call(struct vy_run_env *env, struct cbus_call_msg *msg,
103,827✔
216
                     cbus_call_f func)
217
{
218
        /* Optimization: use blocking I/O during WAL recovery. */
219
        if (env->reader_pool == NULL)
103,827✔
220
                return func(msg);
73✔
221

222
        /* Pick a reader thread. */
223
        struct vy_run_reader *reader;
224
        reader = &env->reader_pool[env->next_reader++];
103,754✔
225
        env->next_reader %= env->reader_pool_size;
103,754✔
226

227
        /* Post the task to the reader thread. */
228
        if (cbus_call(&reader->reader_pipe, &reader->tx_pipe, msg, func) != 0)
103,754✔
229
                return -1;
5✔
230

231
        if (fiber_is_cancelled()) {
103,747✔
232
                diag_set(FiberIsCancelled);
2!
233
                return -1;
2✔
234
        }
235
        return 0;
103,745✔
236
}
237

238
/**
239
 * Initialize page info struct
240
 *
241
 * @retval 0 for Success
242
 * @retval -1 for error
243
 */
244
static int
245
vy_page_info_create(struct vy_page_info *page_info, uint64_t offset,
140,872✔
246
                    const char *min_key, struct key_def *cmp_def)
247
{
248
        memset(page_info, 0, sizeof(*page_info));
140,872✔
249
        page_info->offset = offset;
140,872✔
250
        page_info->unpacked_size = 0;
140,872✔
251
        page_info->min_key = vy_key_dup(min_key);
140,872✔
252
        if (page_info->min_key == NULL)
140,872!
253
                return -1;
×
254
        uint32_t part_count = mp_decode_array(&min_key);
140,872✔
255
        page_info->min_key_hint = key_hint(min_key, part_count, cmp_def);
140,872✔
256
        return page_info->min_key == NULL ? -1 : 0;
140,872!
257
}
258

259
/**
260
 * Destroy page info struct
261
 */
262
static void
263
vy_page_info_destroy(struct vy_page_info *page_info)
132,514✔
264
{
265
        if (page_info->min_key != NULL)
132,514!
266
                free(page_info->min_key);
132,514✔
267
}
132,514✔
268

269
struct vy_run *
270
vy_run_new(struct vy_run_env *env, int64_t id)
3,566✔
271
{
272
        struct vy_run *run = calloc(1, sizeof(struct vy_run));
3,566✔
273
        if (unlikely(run == NULL)) {
3,566!
274
                diag_set(OutOfMemory, sizeof(struct vy_run), "malloc",
×
275
                         "struct vy_run");
276
                return NULL;
×
277
        }
278
        run->env = env;
3,566✔
279
        run->id = id;
3,566✔
280
        run->dump_lsn = -1;
3,566✔
281
        run->fd = -1;
3,566✔
282
        run->refs = 1;
3,566✔
283
        rlist_create(&run->in_lsm);
3,566✔
284
        rlist_create(&run->in_unused);
3,566✔
285
        return run;
3,566✔
286
}
287

288
static void
289
vy_run_clear(struct vy_run *run)
2,759✔
290
{
291
        if (run->page_info != NULL) {
2,759✔
292
                uint32_t page_no;
293
                for (page_no = 0; page_no < run->info.page_count; ++page_no)
135,167✔
294
                        vy_page_info_destroy(run->page_info + page_no);
132,514✔
295
                free(run->page_info);
2,653✔
296
        }
297
        run->page_info = NULL;
2,759✔
298
        run->page_index_size = 0;
2,759✔
299
        run->info.page_count = 0;
2,759✔
300
        if (run->info.bloom != NULL) {
2,759✔
301
                tuple_bloom_delete(run->info.bloom);
2,649✔
302
                run->info.bloom = NULL;
2,649✔
303
        }
304
        free(run->info.min_key);
2,759✔
305
        run->info.min_key = NULL;
2,759✔
306
        free(run->info.max_key);
2,759✔
307
        run->info.max_key = NULL;
2,759✔
308
}
2,759✔
309

310
void
311
vy_run_delete(struct vy_run *run)
2,753✔
312
{
313
        assert(run->refs == 0);
2,753!
314
        if (run->fd >= 0 && close(run->fd) < 0)
2,753!
315
                say_syserror("close failed");
×
316
        vy_run_clear(run);
2,753✔
317
        TRASH(run);
2,753✔
318
        free(run);
2,753✔
319
}
2,753✔
320

321
size_t
322
vy_run_bloom_size(struct vy_run *run)
6,084✔
323
{
324
        return run->info.bloom == NULL ? 0 : tuple_bloom_size(run->info.bloom);
6,084✔
325
}
326

327
/**
328
 * Find a page from which the iteration of a given key must be started.
329
 * LE and LT: the found page definitely contains the position
330
 *  for iteration start.
331
 * GE, GT, EQ: Since page search uses only min_key of pages,
332
 *  it may happen that the found page doesn't contain the position
333
 *  for iteration start. In this case it is certain that the iteration
334
 *  must be started from the beginning of the next page.
335
 *
336
 * @param run - run
337
 * @param key - key to find
338
 * @param key_def - key_def for comparison
339
 * @param itype - iterator type (see above)
340
 * @param equal_key: *equal_key is set to true if there is a page
341
 *  with min_key equal to the given key.
342
 * @return offset of the page in page index OR run->info.page_count if
343
 *  there no pages fulfilling the conditions.
344
 */
345
static uint32_t
346
vy_page_index_find_page(struct vy_run *run, struct vy_entry key,
88,167✔
347
                        struct key_def *cmp_def, enum iterator_type itype,
348
                        bool *equal_key)
349
{
350
        if (itype == ITER_EQ)
88,167✔
351
                itype = ITER_GE; /* One day it'll become obsolete */
75,645✔
352
        assert(itype == ITER_GE || itype == ITER_GT ||
88,167!
353
               itype == ITER_LE || itype == ITER_LT);
354
        int dir = iterator_direction(itype);
88,167!
355
        *equal_key = false;
88,167✔
356

357
        /**
358
         * Binary search in page index. Depends on given iterator_type:
359
         *  ITER_GE: lowest page with min_key >= given key.
360
         *  ITER_GT: lowest page with min_key > given key.
361
         *  ITER_LE: highest page with min_key <= given key.
362
         *  ITER_LT: highest page with min_key < given key.
363
         *
364
         * Example: we are searching for a value 2 in the run of 10 pages:
365
         * min_key:         [1   1   2   2   2   2   2   3   3   3]
366
         * we want to find: [    LT  GE              LE  GT       ]
367
         * For LT and GE it's a classical lower_bound search.
368
         * Let's set up a range with left page's min_key < key and
369
         *  right page's min >= key; binary cut the range until it
370
         *  becomes of length 1 and then LT pos = left bound of the range
371
         *  and GE pos = right bound of the range.
372
         * For LE and GT it's a classical upper_bound search.
373
         * Let's set up a range with left page's min_key <= key and
374
         *  right page's min > key; binary cut the range until it
375
         *  becomes of length 1 and then LE pos = left bound of the range
376
         *  and GT pos = right bound of the range.
377
         */
378
        bool is_lower_bound = itype == ITER_LT || itype == ITER_GE;
88,167✔
379

380
        assert(run->info.page_count > 0);
88,167!
381
        /* Initially the range is set with virtual positions */
382
        int32_t range[2] = { -1, run->info.page_count };
88,167✔
383
        assert(run->info.page_count > 0);
88,167!
384
        do {
385
                int32_t mid = range[0] + (range[1] - range[0]) / 2;
546,855✔
386
                struct vy_page_info *info = vy_run_page_info(run, mid);
546,855!
387
                int cmp = vy_entry_compare_with_raw_key(key, info->min_key,
546,855!
388
                                                        info->min_key_hint,
389
                                                        cmp_def);
390
                if (is_lower_bound)
546,855✔
391
                        range[cmp <= 0] = mid;
529,048✔
392
                else
393
                        range[cmp < 0] = mid;
17,807✔
394
                *equal_key = *equal_key || cmp == 0;
546,855✔
395
        } while (range[1] - range[0] > 1);
546,855✔
396
        if (range[0] < 0)
88,167✔
397
                range[0] = run->info.page_count;
2,383✔
398
        uint32_t page = range[dir > 0];
88,167✔
399

400
        /**
401
         * Since page search uses only min_key of pages,
402
         *  for GE, GT and EQ the previous page can contain
403
         *  the point where iteration must be started.
404
         */
405
        if (page > 0 && dir > 0)
88,167✔
406
                return page - 1;
88,167✔
407
        return page;
7,319✔
408
}
409

410
struct vy_slice *
411
vy_slice_new(int64_t id, struct vy_run *run, struct vy_entry begin,
6,306✔
412
             struct vy_entry end, struct key_def *cmp_def)
413
{
414
        struct vy_slice *slice = malloc(sizeof(*slice));
6,306✔
415
        if (slice == NULL) {
6,306!
416
                diag_set(OutOfMemory, sizeof(*slice),
×
417
                         "malloc", "struct vy_slice");
418
                return NULL;
6,306✔
419
        }
420
        memset(slice, 0, sizeof(*slice));
6,306✔
421
        slice->id = id;
6,306✔
422
        slice->run = run;
6,306✔
423
        slice->seed = rand();
6,306✔
424
        vy_run_ref(run);
6,306!
425
        run->slice_count++;
6,306✔
426
        if (begin.stmt != NULL)
6,306✔
427
                tuple_ref(begin.stmt);
2,676!
428
        slice->begin = begin;
6,306✔
429
        if (end.stmt != NULL)
6,306✔
430
                tuple_ref(end.stmt);
2,684!
431
        slice->end = end;
6,306✔
432
        rlist_create(&slice->in_range);
6,306✔
433
        fiber_cond_create(&slice->pin_cond);
6,306!
434
        if (run->info.page_count == 0) {
6,306!
435
                /* The run is empty hence the slice is empty too. */
436
                return slice;
×
437
        }
438
        /** Lookup the first and the last pages spanned by the slice. */
439
        bool unused;
440
        if (slice->begin.stmt == NULL) {
6,306✔
441
                slice->first_page_no = 0;
3,630✔
442
        } else {
443
                slice->first_page_no =
2,676✔
444
                        vy_page_index_find_page(run, slice->begin, cmp_def,
2,676!
445
                                                ITER_GE, &unused);
446
                assert(slice->first_page_no < run->info.page_count);
2,676!
447
        }
448
        if (slice->end.stmt == NULL) {
6,306✔
449
                slice->last_page_no = run->info.page_count - 1;
3,622✔
450
        } else {
451
                slice->last_page_no =
2,684✔
452
                        vy_page_index_find_page(run, slice->end, cmp_def,
2,684!
453
                                                ITER_LT, &unused);
454
                if (slice->last_page_no == run->info.page_count) {
2,684✔
455
                        /* It's an empty slice */
456
                        slice->first_page_no = 0;
7✔
457
                        slice->last_page_no = 0;
7✔
458
                        return slice;
7✔
459
                }
460
        }
461
        assert(slice->last_page_no >= slice->first_page_no);
6,299!
462
        /** Estimate the number of statements in the slice. */
463
        uint32_t run_pages = run->info.page_count;
6,299✔
464
        uint32_t slice_pages = slice->last_page_no - slice->first_page_no + 1;
6,299✔
465
        slice->count.pages = slice_pages;
6,299✔
466
        slice->count.rows = DIV_ROUND_UP(run->count.rows *
6,299✔
467
                                         slice_pages, run_pages);
468
        slice->count.bytes = DIV_ROUND_UP(run->count.bytes *
6,299✔
469
                                          slice_pages, run_pages);
470
        slice->count.bytes_compressed = DIV_ROUND_UP(
6,299✔
471
                run->count.bytes_compressed * slice_pages, run_pages);
472
        return slice;
6,299✔
473
}
474

475
void
476
vy_slice_delete(struct vy_slice *slice)
5,421✔
477
{
478
        assert(slice->pin_count == 0);
5,421!
479
        assert(slice->run->slice_count > 0);
5,421!
480
        slice->run->slice_count--;
5,421✔
481
        vy_run_unref(slice->run);
5,421✔
482
        if (slice->begin.stmt != NULL)
5,421✔
483
                tuple_unref(slice->begin.stmt);
2,583✔
484
        if (slice->end.stmt != NULL)
5,421✔
485
                tuple_unref(slice->end.stmt);
2,586✔
486
        fiber_cond_destroy(&slice->pin_cond);
5,421✔
487
        TRASH(slice);
5,421✔
488
        free(slice);
5,421✔
489
}
5,421✔
490

491
int
492
vy_slice_cut(struct vy_slice *slice, int64_t id, struct vy_entry begin,
1,744✔
493
             struct vy_entry end, struct key_def *cmp_def,
494
             struct vy_slice **result)
495
{
496
        *result = NULL;
1,744✔
497

498
        if (begin.stmt != NULL && slice->end.stmt != NULL &&
1,744✔
499
            vy_entry_compare(begin, slice->end, cmp_def) >= 0)
913!
500
                return 0; /* no intersection: begin >= slice->end */
×
501

502
        if (end.stmt != NULL && slice->begin.stmt != NULL &&
1,744✔
503
            vy_entry_compare(end, slice->begin, cmp_def) <= 0)
915!
504
                return 0; /* no intersection: end <= slice->end */
×
505

506
        /* begin = MAX(begin, slice->begin) */
507
        if (slice->begin.stmt != NULL &&
1,744✔
508
            (begin.stmt == NULL || vy_entry_compare(begin, slice->begin,
1,018!
509
                                                    cmp_def) < 0))
510
                begin = slice->begin;
×
511

512
        /* end = MIN(end, slice->end) */
513
        if (slice->end.stmt != NULL &&
1,744✔
514
            (end.stmt == NULL || vy_entry_compare(end, slice->end,
1,014!
515
                                                  cmp_def) > 0))
516
                end = slice->end;
×
517

518
        *result = vy_slice_new(id, slice->run, begin, end, cmp_def);
1,744✔
519
        if (*result == NULL)
1,744!
520
                return -1; /* OOM */
×
521

522
        return 0;
1,744✔
523
}
524

525
/**
526
 * Decode page information from xrow.
527
 *
528
 * @param[out] page Page information.
529
 * @param xrow      Xrow to decode.
530
 * @param cmp_def   Definition of keys stored in the page.
531
 * @param filename  Filename for error reporting.
532
 *
533
 * @retval  0 Success.
534
 * @retval -1 Error.
535
 */
536
static int
537
vy_page_info_decode(struct vy_page_info *page, const struct xrow_header *xrow,
16,264✔
538
                    struct key_def *cmp_def, const char *filename)
539
{
540
        assert(xrow->type == VY_INDEX_PAGE_INFO);
16,264!
541
        const char *pos = xrow->body->iov_base;
16,264✔
542
        memset(page, 0, sizeof(*page));
16,264✔
543
        uint64_t key_map = vy_page_info_key_map;
16,264✔
544
        uint32_t map_size = mp_decode_map(&pos);
16,264!
545
        uint32_t map_item;
546
        const char *key_beg;
547
        uint32_t part_count;
548
        for (map_item = 0; map_item < map_size; ++map_item) {
113,848✔
549
                uint32_t key = mp_decode_uint(&pos);
97,584!
550
                key_map &= ~(1ULL << key);
97,584✔
551
                switch (key) {
97,584!
552
                case VY_PAGE_INFO_OFFSET:
16,264✔
553
                        page->offset = mp_decode_uint(&pos);
16,264!
554
                        break;
16,264✔
555
                case VY_PAGE_INFO_SIZE:
16,264✔
556
                        page->size = mp_decode_uint(&pos);
16,264!
557
                        break;
16,264✔
558
                case VY_PAGE_INFO_ROW_COUNT:
16,264✔
559
                        page->row_count = mp_decode_uint(&pos);
16,264!
560
                        break;
16,264✔
561
                case VY_PAGE_INFO_MIN_KEY:
16,264✔
562
                        key_beg = pos;
16,264✔
563
                        mp_next(&pos);
16,264!
564
                        page->min_key = vy_key_dup(key_beg);
16,264!
565
                        if (page->min_key == NULL)
16,264!
566
                                return -1;
16,264✔
567
                        part_count = mp_decode_array(&key_beg);
16,264✔
568
                        page->min_key_hint = key_hint(key_beg, part_count,
16,264!
569
                                                      cmp_def);
570
                        break;
16,264✔
571
                case VY_PAGE_INFO_UNPACKED_SIZE:
16,264✔
572
                        page->unpacked_size = mp_decode_uint(&pos);
16,264!
573
                        break;
16,264✔
574
                case VY_PAGE_INFO_ROW_INDEX_OFFSET:
16,264✔
575
                        page->row_index_offset = mp_decode_uint(&pos);
16,264!
576
                        break;
16,264✔
577
                default:
×
578
                        mp_next(&pos); /* unknown key, ignore */
×
579
                        break;
×
580
                }
581
        }
582
        if (key_map) {
16,264!
583
                enum vy_page_info_key key = bit_ctz_u64(key_map);
×
584
                diag_set(ClientError, ER_INVALID_INDEX_FILE, filename,
×
585
                         tt_sprintf("Can't decode page info: "
586
                                    "missing mandatory key %s",
587
                                    vy_page_info_key_name(key)));
588
                return -1;
×
589
        }
590

591
        return 0;
16,264✔
592
}
593

594
/** Decode statement statistics from @data and advance @data. */
595
static void
596
vy_stmt_stat_decode(struct vy_stmt_stat *stat, const char **data)
277✔
597
{
598
        uint32_t size = mp_decode_map(data);
277✔
599
        for (uint32_t i = 0; i < size; i++) {
1,385✔
600
                uint64_t key = mp_decode_uint(data);
1,108✔
601
                uint64_t value = mp_decode_uint(data);
1,108✔
602
                switch (key) {
1,108!
603
                case IPROTO_INSERT:
277✔
604
                        stat->inserts = value;
277✔
605
                        break;
277✔
606
                case IPROTO_REPLACE:
277✔
607
                        stat->replaces = value;
277✔
608
                        break;
277✔
609
                case IPROTO_DELETE:
277✔
610
                        stat->deletes = value;
277✔
611
                        break;
277✔
612
                case IPROTO_UPSERT:
277✔
613
                        stat->upserts = value;
277✔
614
                        break;
277✔
615
                default:
×
616
                        break;
×
617
                }
618
        }
619
}
277✔
620

621
/**
622
 * Decode the run metadata from xrow.
623
 *
624
 * @param xrow xrow to decode
625
 * @param[out] run_info the run information
626
 * @param filename File name for error reporting.
627
 *
628
 * @retval  0 success
629
 * @retval -1 error (check diag)
630
 */
631
int
632
vy_run_info_decode(struct vy_run_info *run_info,
277✔
633
                   const struct xrow_header *xrow,
634
                   const char *filename)
635
{
636
        assert(xrow->type == VY_INDEX_RUN_INFO);
277!
637
        /* decode run */
638
        const char *pos = xrow->body->iov_base;
277✔
639
        memset(run_info, 0, sizeof(*run_info));
277✔
640
        uint64_t key_map = vy_run_info_key_map;
277✔
641
        uint32_t map_size = mp_decode_map(&pos);
277!
642
        uint32_t map_item;
643
        const char *tmp;
644
        /* decode run values */
645
        for (map_item = 0; map_item < map_size; ++map_item) {
2,216✔
646
                uint32_t key = mp_decode_uint(&pos);
1,939!
647
                key_map &= ~(1ULL << key);
1,939✔
648
                switch (key) {
1,939!
649
                case VY_RUN_INFO_MIN_KEY:
277✔
650
                        tmp = pos;
277✔
651
                        mp_next(&pos);
277!
652
                        run_info->min_key = vy_key_dup(tmp);
277!
653
                        if (run_info->min_key == NULL)
277!
654
                                return -1;
277✔
655
                        break;
277✔
656
                case VY_RUN_INFO_MAX_KEY:
277✔
657
                        tmp = pos;
277✔
658
                        mp_next(&pos);
277!
659
                        run_info->max_key = vy_key_dup(tmp);
277!
660
                        if (run_info->max_key == NULL)
277!
661
                                return -1;
×
662
                        break;
277✔
663
                case VY_RUN_INFO_MIN_LSN:
277✔
664
                        run_info->min_lsn = mp_decode_uint(&pos);
277!
665
                        break;
277✔
666
                case VY_RUN_INFO_MAX_LSN:
277✔
667
                        run_info->max_lsn = mp_decode_uint(&pos);
277!
668
                        break;
277✔
669
                case VY_RUN_INFO_PAGE_COUNT:
277✔
670
                        run_info->page_count = mp_decode_uint(&pos);
277!
671
                        break;
277✔
672
                case VY_RUN_INFO_BLOOM_FILTER_LEGACY:
×
673
                        run_info->bloom = tuple_bloom_decode_legacy(&pos);
×
674
                        if (run_info->bloom == NULL)
×
675
                                return -1;
×
676
                        break;
×
677
                case VY_RUN_INFO_BLOOM_FILTER:
277✔
678
                        run_info->bloom = tuple_bloom_decode(&pos);
277!
679
                        if (run_info->bloom == NULL)
277!
680
                                return -1;
×
681
                        break;
277✔
682
                case VY_RUN_INFO_STMT_STAT:
277✔
683
                        vy_stmt_stat_decode(&run_info->stmt_stat, &pos);
277!
684
                        break;
277✔
685
                default:
×
686
                        mp_next(&pos); /* unknown key, ignore */
×
687
                        break;
×
688
                }
689
        }
690
        if (key_map) {
277!
691
                enum vy_run_info_key key = bit_ctz_u64(key_map);
×
692
                diag_set(ClientError, ER_INVALID_INDEX_FILE, filename,
×
693
                         tt_sprintf("Can't decode run info: "
694
                                    "missing mandatory key %s",
695
                                    vy_run_info_key_name(key)));
696
                return -1;
×
697
        }
698
        return 0;
277✔
699
}
700

701
static struct vy_page *
702
vy_page_new(const struct vy_page_info *page_info)
215,605✔
703
{
704
        struct vy_page *page = malloc(sizeof(*page));
215,605✔
705
        if (page == NULL) {
215,605!
706
                diag_set(OutOfMemory, sizeof(*page),
×
707
                         "load_page", "page cache");
708
                return NULL;
×
709
        }
710
        page->unpacked_size = page_info->unpacked_size;
215,605✔
711
        page->row_count = page_info->row_count;
215,605✔
712
        page->row_index = calloc(page_info->row_count, sizeof(uint32_t));
215,605✔
713
        if (page->row_index == NULL) {
215,605!
714
                diag_set(OutOfMemory, page_info->row_count * sizeof(uint32_t),
×
715
                         "malloc", "page->row_index");
716
                free(page);
×
717
                return NULL;
×
718
        }
719

720
        page->data = (char *)malloc(page_info->unpacked_size);
215,605✔
721
        if (page->data == NULL) {
215,605!
722
                diag_set(OutOfMemory, page_info->unpacked_size,
×
723
                         "malloc", "page->data");
724
                free(page->row_index);
×
725
                free(page);
×
726
                return NULL;
×
727
        }
728
        return page;
215,605✔
729
}
730

731
static void
732
vy_page_delete(struct vy_page *page)
215,573✔
733
{
734
        uint32_t *row_index = page->row_index;
215,573✔
735
        char *data = page->data;
215,573✔
736
#if !defined(NDEBUG)
737
        memset(row_index, '#', sizeof(uint32_t) * page->row_count);
215,573✔
738
        memset(data, '#', page->unpacked_size);
215,573✔
739
        memset(page, '#', sizeof(*page));
215,573✔
740
#endif /* !defined(NDEBUG) */
741
        free(row_index);
215,573✔
742
        free(data);
215,573✔
743
        free(page);
215,573✔
744
}
215,573✔
745

746
static int
747
vy_page_xrow(struct vy_page *page, uint32_t stmt_no,
4,937,320✔
748
             struct xrow_header *xrow)
749
{
750
        assert(stmt_no < page->row_count);
4,937,320!
751
        const char *data = page->data + page->row_index[stmt_no];
4,937,320✔
752
        const char *data_end = stmt_no + 1 < page->row_count ?
4,937,320✔
753
                               page->data + page->row_index[stmt_no + 1] :
4,782,800✔
754
                               page->data + page->unpacked_size;
154,519✔
755
        return xrow_header_decode(xrow, &data, data_end, false);
4,937,320!
756
}
757

758
/* {{{ vy_run_iterator vy_run_iterator support functions */
759

760
/**
761
 * Read raw stmt data from the page
762
 * @param page          Page.
763
 * @param stmt_no       Statement position in the page.
764
 * @param cmp_def       Definition of keys stored in the page.
765
 * @param format        Format for REPLACE/DELETE tuples.
766
 *
767
 * @retval not NULL Statement read from page.
768
 * @retval     NULL Memory error.
769
 */
770
static struct vy_entry
771
vy_page_stmt(struct vy_page *page, uint32_t stmt_no,
4,937,320✔
772
             struct key_def *cmp_def, struct tuple_format *format)
773
{
774
        struct xrow_header xrow;
775
        if (vy_page_xrow(page, stmt_no, &xrow) != 0)
4,937,320!
776
                return vy_entry_none();
4,937,320!
777
        struct vy_entry entry;
778
        entry.stmt = vy_stmt_decode(&xrow, format);
4,937,320!
779
        if (entry.stmt == NULL)
4,937,320✔
780
                return vy_entry_none();
1!
781
        entry.hint = vy_stmt_hint(entry.stmt, cmp_def);
4,937,320!
782
        return entry;
4,937,320✔
783
}
784

785
/**
786
 * Binary search in page
787
 * In terms of STL, makes lower_bound for EQ,GE,LT and upper_bound for GT,LE
788
 * Additionally *equal_key argument is set to true if the found value is
789
 * equal to given key (set to false otherwise).
790
 * @retval position in the page
791
 */
792
static uint32_t
793
vy_page_find_key(struct vy_page *page, struct vy_entry key,
84,440✔
794
                 struct key_def *cmp_def, struct tuple_format *format,
795
                 enum iterator_type iterator_type, bool *equal_key)
796
{
797
        uint32_t beg = 0;
84,440✔
798
        uint32_t end = page->row_count;
84,440✔
799
        *equal_key = false;
84,440✔
800
        /* for upper bound we change zero comparison result to -1 */
801
        int zero_cmp = (iterator_type == ITER_GT ||
80,921✔
802
                        iterator_type == ITER_LE ? -1 : 0);
165,361✔
803
        while (beg != end) {
431,848✔
804
                uint32_t mid = beg + (end - beg) / 2;
347,408✔
805
                struct vy_entry fnd_key = vy_page_stmt(page, mid, cmp_def,
347,408!
806
                                                       format);
807
                if (fnd_key.stmt == NULL)
347,408!
808
                        return end;
×
809
                int cmp = vy_entry_compare(fnd_key, key, cmp_def);
347,408!
810
                cmp = cmp ? cmp : zero_cmp;
347,408✔
811
                *equal_key = *equal_key || cmp == 0;
347,408✔
812
                if (cmp < 0)
347,408✔
813
                        beg = mid + 1;
177,302✔
814
                else
815
                        end = mid;
170,106✔
816
                tuple_unref(fnd_key.stmt);
347,408!
817
        }
818
        return end;
84,440✔
819
}
820

821
/**
822
 * End iteration and free cached data.
823
 */
824
static void
825
vy_run_iterator_stop(struct vy_run_iterator *itr)
995,916✔
826
{
827
        if (itr->curr.stmt != NULL) {
995,916✔
828
                tuple_unref(itr->curr.stmt);
65,299✔
829
                itr->curr = vy_entry_none();
65,299✔
830
        }
831
        if (itr->curr_page != NULL) {
995,916✔
832
                vy_page_delete(itr->curr_page);
82,307✔
833
                if (itr->prev_page != NULL)
82,307✔
834
                        vy_page_delete(itr->prev_page);
7,727✔
835
                itr->curr_page = itr->prev_page = NULL;
82,307✔
836
        }
837
}
995,916✔
838

839
static int
840
vy_row_index_decode(uint32_t *row_index, uint32_t row_count,
215,600✔
841
                    struct xrow_header *xrow)
842
{
843
        assert(xrow->type == VY_RUN_ROW_INDEX);
215,600!
844
        const char *pos = xrow->body->iov_base;
215,600✔
845
        uint32_t map_size = mp_decode_map(&pos);
215,600!
846
        uint32_t map_item;
847
        uint32_t size = 0;
215,600✔
848
        for (map_item = 0; map_item < map_size; ++map_item) {
431,200✔
849
                uint32_t key = mp_decode_uint(&pos);
215,600!
850
                switch (key) {
215,600!
851
                case VY_ROW_INDEX_DATA:
215,600✔
852
                        size = mp_decode_binl(&pos);
215,600!
853
                        break;
215,600✔
854
                }
855
        }
215,600✔
856
        if (size != sizeof(uint32_t) * row_count) {
215,600!
857
                diag_set(ClientError, ER_INVALID_RUN_FILE,
×
858
                         tt_sprintf("Wrong row index size "
859
                                    "(expected %zu, got %u",
860
                                    sizeof(uint32_t) * row_count,
861
                                    (unsigned)size));
862
                return -1;
215,600✔
863
        }
864
        for (uint32_t i = 0; i < row_count; ++i) {
7,074,730✔
865
                row_index[i] = mp_load_u32(&pos);
6,859,130!
866
        }
867
        assert(pos == xrow->body->iov_base + xrow->body->iov_len);
215,600!
868
        return 0;
215,600✔
869
}
870

871
/** Return the name of a run data file. */
872
static inline const char *
873
vy_run_filename(struct vy_run *run)
2✔
874
{
875
        char *buf = tt_static_buf();
2✔
876
        vy_run_snprint_filename(buf, TT_STATIC_BUF_LEN, run->id, VY_FILE_RUN);
2✔
877
        return buf;
2✔
878
}
879

880
/**
881
 * Read a page requests from vinyl xlog data file.
882
 *
883
 * @retval 0 on success
884
 * @retval -1 on error, check diag
885
 */
886
static int
887
vy_page_read(struct vy_page *page, const struct vy_page_info *page_info,
215,605✔
888
             struct vy_run *run, ZSTD_DStream *zdctx)
889
{
890
        /* read xlog tx from xlog file */
891
        size_t region_svp = region_used(&fiber()->gc);
215,605!
892
        char *data = (char *)region_alloc(&fiber()->gc, page_info->size);
215,605!
893
        if (data == NULL) {
215,605!
894
                diag_set(OutOfMemory, page_info->size, "region gc", "page");
×
895
                return -1;
215,602✔
896
        }
897
        ssize_t readen = fio_pread(run->fd, data, page_info->size,
431,209✔
898
                                   page_info->offset);
215,605✔
899
        ERROR_INJECT(ERRINJ_VYRUN_DATA_READ, {
215,604!
900
                readen = -1;
901
                errno = EIO;});
902
        if (readen < 0) {
215,604✔
903
                diag_set(SystemError, "failed to read from file");
1!
904
                goto error;
1✔
905
        }
906
        if (readen != (ssize_t)page_info->size) {
215,603!
907
                diag_set(ClientError, ER_INVALID_RUN_FILE,
×
908
                         "Unexpected end of file");
909
                goto error;
×
910
        }
911

912
        struct errinj *inj = errinj(ERRINJ_VY_READ_PAGE_TIMEOUT, ERRINJ_DOUBLE);
215,603!
913
        if (inj != NULL && inj->dparam > 0)
215,603!
914
                thread_sleep(inj->dparam);
210✔
915

916
        ERROR_INJECT_SLEEP(ERRINJ_VY_READ_PAGE_DELAY);
216,085!
917

918
        /* decode xlog tx */
919
        const char *data_pos = data;
215,601✔
920
        const char *data_end = data + readen;
215,601✔
921
        char *rows = page->data;
215,601✔
922
        char *rows_end = rows + page_info->unpacked_size;
215,601✔
923
        if (xlog_tx_decode(data, data_end, rows, rows_end, zdctx) != 0)
215,601!
924
                goto error;
1✔
925

926
        struct xrow_header xrow;
927
        data_pos = page->data + page_info->row_index_offset;
215,600✔
928
        data_end = page->data + page_info->unpacked_size;
215,600✔
929
        if (xrow_header_decode(&xrow, &data_pos, data_end, true) == -1)
215,600!
930
                goto error;
×
931
        if (xrow.type != VY_RUN_ROW_INDEX) {
215,600!
932
                diag_set(ClientError, ER_INVALID_RUN_FILE,
×
933
                         tt_sprintf("Wrong row index type "
934
                                    "(expected %d, got %u)",
935
                                    VY_RUN_ROW_INDEX, (unsigned)xrow.type));
936
                goto error;
×
937
        }
938
        if (vy_row_index_decode(page->row_index, page->row_count, &xrow) != 0)
215,600!
939
                goto error;
×
940
        region_truncate(&fiber()->gc, region_svp);
215,600!
941
        ERROR_INJECT(ERRINJ_VY_READ_PAGE, {
215,600!
942
                diag_set(ClientError, ER_INJECTION, "vinyl page read");
943
                return -1;});
944
        return 0;
215,597✔
945
error:
2✔
946
        region_truncate(&fiber()->gc, region_svp);
2!
947
        diag_log();
2!
948
        say_error("error reading %s@%llu:%u", vy_run_filename(run),
2!
949
                  (unsigned long long)page_info->offset,
950
                  (unsigned)page_info->size);
951
        return -1;
2✔
952
}
953

954
/**
955
 * Get thread local zstd decompression context
956
 */
957
static ZSTD_DStream *
958
vy_env_get_zdctx(struct vy_run_env *env)
215,605✔
959
{
960
        ZSTD_DStream *zdctx = tt_pthread_getspecific(env->zdctx_key);
215,605✔
961
        if (zdctx == NULL) {
215,605✔
962
                zdctx = ZSTD_createDStream();
384✔
963
                if (zdctx == NULL) {
384!
964
                        diag_set(OutOfMemory, sizeof(zdctx), "malloc",
×
965
                                 "zstd context");
966
                        return NULL;
×
967
                }
968
                tt_pthread_setspecific(env->zdctx_key, zdctx);
384!
969
        }
970
        return zdctx;
215,605✔
971
}
972

973
/**
974
 * vinyl read task callback
975
 */
976
static int
977
vy_page_read_cb(struct cbus_call_msg *base)
103,827✔
978
{
979
        struct vy_page_read_task *task = (struct vy_page_read_task *)base;
103,827✔
980
        ZSTD_DStream *zdctx = vy_env_get_zdctx(task->run->env);
103,827✔
981
        if (zdctx == NULL)
103,827!
982
                return -1;
×
983
        if (vy_page_read(task->page, task->page_info, task->run, zdctx) != 0)
103,827✔
984
                return -1;
5✔
985
        if (task->key.stmt != NULL) {
103,820✔
986
                task->pos_in_page = vy_page_find_key(task->page, task->key,
81,321✔
987
                                                     task->cmp_def, task->format,
988
                                                     task->iterator_type,
989
                                                     &task->equal_found);
990
        }
991
        return 0;
103,820✔
992
}
993

994
/**
995
 * Read a page from disk given its number.
996
 * The function caches two most recently read pages.
997
 *
998
 * @retval 0 success
999
 * @retval -1 critical error
1000
 */
1001
static NODISCARD int
1002
vy_run_iterator_load_page(struct vy_run_iterator *itr, uint32_t page_no,
421,806✔
1003
                          struct vy_entry key, enum iterator_type iterator_type,
1004
                          struct vy_page **result, uint32_t *pos_in_page,
1005
                          bool *equal_found)
1006
{
1007
        struct vy_slice *slice = itr->slice;
421,806✔
1008
        struct vy_run_env *env = slice->run->env;
421,806✔
1009

1010
        /* Check cache */
1011
        struct vy_page *page = NULL;
421,806✔
1012
        if (itr->curr_page != NULL &&
421,806✔
1013
            itr->curr_page->page_no == page_no) {
339,488✔
1014
                page = itr->curr_page;
317,979✔
1015
        } else if (itr->prev_page != NULL &&
103,827✔
1016
                   itr->prev_page->page_no == page_no) {
13,781!
1017
                SWAP(itr->prev_page, itr->curr_page);
×
1018
                page = itr->curr_page;
×
1019
        }
1020
        if (page != NULL) {
421,806✔
1021
                if (key.stmt != NULL)
317,979✔
1022
                        *pos_in_page = vy_page_find_key(page, key, itr->cmp_def,
1,038✔
1023
                                                        itr->format, iterator_type,
1024
                                                        equal_found);
1025
                *result = page;
317,979✔
1026
                return 0;
317,979✔
1027
        }
1028

1029
        /* Allocate buffers */
1030
        struct vy_page_info *page_info = vy_run_page_info(slice->run, page_no);
103,827✔
1031
        page = vy_page_new(page_info);
103,827✔
1032
        if (page == NULL)
103,827!
1033
                return -1;
×
1034

1035
        /* Read page data from the disk */
1036
        struct vy_page_read_task *task = mempool_alloc(&env->read_task_pool);
103,827✔
1037
        if (task == NULL) {
103,827!
1038
                diag_set(OutOfMemory, sizeof(*task),
×
1039
                         "mempool", "vy_page_read_task");
1040
                vy_page_delete(page);
×
1041
                return -1;
×
1042
        }
1043
        task->run = slice->run;
103,827✔
1044
        task->page_info = page_info;
103,827✔
1045
        task->page = page;
103,827✔
1046
        task->key = key;
103,827✔
1047
        task->iterator_type = iterator_type;
103,827✔
1048
        task->cmp_def = itr->cmp_def;
103,827✔
1049
        task->format = itr->format;
103,827✔
1050
        task->pos_in_page = 0;
103,827✔
1051
        task->equal_found = false;
103,827✔
1052

1053
        int rc = vy_run_env_coio_call(env, &task->base, vy_page_read_cb);
103,827✔
1054

1055
        *pos_in_page = task->pos_in_page;
103,825✔
1056
        *equal_found = task->equal_found;
103,825✔
1057

1058
        mempool_free(&env->read_task_pool, task);
103,825✔
1059
        if (rc != 0) {
103,825✔
1060
                vy_page_delete(page);
7✔
1061
                return -1;
7✔
1062
        }
1063

1064
        /* Update cache */
1065
        if (itr->prev_page != NULL)
103,818✔
1066
                vy_page_delete(itr->prev_page);
13,781✔
1067
        itr->prev_page = itr->curr_page;
103,818✔
1068
        itr->curr_page = page;
103,818✔
1069
        page->page_no = page_no;
103,818✔
1070

1071
        /* Update read statistics. */
1072
        itr->stat->read.rows += page_info->row_count;
103,818✔
1073
        itr->stat->read.bytes += page_info->unpacked_size;
103,818✔
1074
        itr->stat->read.bytes_compressed += page_info->size;
103,818✔
1075
        itr->stat->read.pages++;
103,818✔
1076

1077
        *result = page;
103,818✔
1078
        return 0;
103,818✔
1079
}
1080

1081
/**
1082
 * Read key and lsn by a given wide position.
1083
 * For the first record in a page reads the result from the page
1084
 * index instead of fetching it from disk.
1085
 *
1086
 * @retval 0 success
1087
 * @retval -1 read error or out of memory.
1088
 */
1089
static NODISCARD int
1090
vy_run_iterator_read(struct vy_run_iterator *itr,
339,446✔
1091
                     struct vy_run_iterator_pos pos,
1092
                     struct vy_entry *ret)
1093
{
1094
        struct vy_page *page;
1095
        bool equal_found;
1096
        uint32_t pos_in_page;
1097
        int rc = vy_run_iterator_load_page(itr, pos.page_no, vy_entry_none(),
339,446!
1098
                                           ITER_GE, &page, &pos_in_page,
1099
                                           &equal_found);
1100
        if (rc != 0)
339,445✔
1101
                return rc;
339,445✔
1102
        *ret = vy_page_stmt(page, pos.pos_in_page, itr->cmp_def, itr->format);
339,438!
1103
        if (ret->stmt == NULL)
339,438!
1104
                return -1;
×
1105
        return 0;
339,438✔
1106
}
1107

1108
/**
1109
 * Binary search in a run for the given key.
1110
 * In terms of STL, makes lower_bound for EQ,GE,LT and upper_bound for GT,LE
1111
 * Resulting wide position is stored it *pos argument
1112
 * Additionally *equal_key argument is set to true if the found value is
1113
 * equal to given key (untouched otherwise)
1114
 *
1115
 * @retval 0 success
1116
 * @retval 1 EOF
1117
 * @retval -1 read or memory error
1118
 */
1119
static NODISCARD int
1120
vy_run_iterator_search(struct vy_run_iterator *itr,
82,807✔
1121
                       enum iterator_type iterator_type, struct vy_entry key,
1122
                       struct vy_run_iterator_pos *pos, bool *equal_key)
1123
{
1124
        pos->page_no = vy_page_index_find_page(itr->slice->run, key,
82,807!
1125
                                               itr->cmp_def, iterator_type,
1126
                                               equal_key);
1127
        if (pos->page_no == itr->slice->run->info.page_count)
82,807✔
1128
                return 1;
82,806✔
1129
        bool equal_in_page;
1130
        struct vy_page *page;
1131
        int rc = vy_run_iterator_load_page(itr, pos->page_no, key,
82,360!
1132
                                           iterator_type, &page,
1133
                                           &pos->pos_in_page, &equal_in_page);
1134
        if (rc != 0)
82,359!
1135
                return rc;
×
1136
        if (pos->pos_in_page == page->row_count) {
82,359✔
1137
                pos->page_no++;
10,774✔
1138
                pos->pos_in_page = 0;
10,774✔
1139
        } else {
1140
                *equal_key = equal_in_page;
71,585✔
1141
        }
1142
        return 0;
82,359✔
1143
}
1144

1145
/**
1146
 * Increment (or decrement, depending on the order) the current
1147
 * wide position.
1148
 * @retval 0 success, set *pos to new value
1149
 * @retval 1 EOF
1150
 * Affects: curr_loaded_page
1151
 */
1152
static NODISCARD int
1153
vy_run_iterator_next_pos(struct vy_run_iterator *itr,
276,938✔
1154
                         enum iterator_type iterator_type,
1155
                         struct vy_run_iterator_pos *pos)
1156
{
1157
        struct vy_run *run = itr->slice->run;
276,938✔
1158
        *pos = itr->curr_pos;
276,938✔
1159
        if (iterator_type == ITER_LE || iterator_type == ITER_LT) {
276,938✔
1160
                assert(pos->page_no <= run->info.page_count);
54,532!
1161
                if (pos->pos_in_page > 0) {
54,532✔
1162
                        pos->pos_in_page--;
53,052✔
1163
                } else {
1164
                        if (pos->page_no == 0)
1,480✔
1165
                                return 1;
140✔
1166
                        pos->page_no--;
1,340✔
1167
                        struct vy_page_info *page_info =
1168
                                vy_run_page_info(run, pos->page_no);
1,340✔
1169
                        assert(page_info->row_count > 0);
1,340!
1170
                        pos->pos_in_page = page_info->row_count - 1;
1,340✔
1171
                }
1172
        } else {
1173
                assert(iterator_type == ITER_GE || iterator_type == ITER_GT ||
222,406!
1174
                       iterator_type == ITER_EQ);
1175
                assert(pos->page_no < run->info.page_count);
222,406!
1176
                struct vy_page_info *page_info =
1177
                        vy_run_page_info(run, pos->page_no);
222,406✔
1178
                assert(page_info->row_count > 0);
222,406!
1179
                pos->pos_in_page++;
222,406✔
1180
                if (pos->pos_in_page >= page_info->row_count) {
222,406✔
1181
                        pos->page_no++;
15,534✔
1182
                        pos->pos_in_page = 0;
15,534✔
1183
                        if (pos->page_no == run->info.page_count)
15,534✔
1184
                                return 1;
1,110✔
1185
                }
1186
        }
1187
        return 0;
275,688✔
1188
}
1189

1190
/**
1191
 * Find the next record with lsn <= itr->lsn record.
1192
 * The current position must be at the beginning of a series of
1193
 * records with the same key it terms of direction of iterator
1194
 * (i.e. left for GE, right for LE).
1195
 * @retval 0 success or EOF (*ret == NULL)
1196
 * @retval -1 read or memory error
1197
 * Affects: curr_loaded_page, curr_pos
1198
 */
1199
static NODISCARD int
1200
vy_run_iterator_find_lsn(struct vy_run_iterator *itr, struct vy_entry *ret)
294,130✔
1201
{
1202
        struct vy_slice *slice = itr->slice;
294,130✔
1203
        struct key_def *cmp_def = itr->cmp_def;
294,130✔
1204

1205
        *ret = vy_entry_none();
294,130✔
1206

1207
        assert(itr->search_started);
294,130!
1208
        assert(itr->curr.stmt != NULL);
294,130!
1209
        assert(itr->curr_pos.page_no < slice->run->info.page_count);
294,130!
1210

1211
        while (vy_stmt_lsn(itr->curr.stmt) > (**itr->read_view).vlsn ||
310,322✔
1212
               vy_stmt_flags(itr->curr.stmt) & VY_STMT_SKIP_READ) {
297,609✔
1213
                if (vy_run_iterator_next_pos(itr, itr->iterator_type,
16,241✔
1214
                                             &itr->curr_pos) != 0) {
1215
                        vy_run_iterator_stop(itr);
21✔
1216
                        return 0;
21✔
1217
                }
1218
                tuple_unref(itr->curr.stmt);
16,220✔
1219
                itr->curr = vy_entry_none();
16,220✔
1220
                if (vy_run_iterator_read(itr, itr->curr_pos, &itr->curr) != 0)
16,220!
1221
                        return -1;
×
1222
                if (itr->iterator_type == ITER_EQ &&
16,220✔
1223
                    vy_entry_compare(itr->curr, itr->key, cmp_def) != 0) {
119✔
1224
                        vy_run_iterator_stop(itr);
28✔
1225
                        return 0;
28✔
1226
                }
1227
        }
1228
        if (itr->iterator_type == ITER_LE || itr->iterator_type == ITER_LT) {
294,081✔
1229
                struct vy_run_iterator_pos test_pos;
1230
                while (vy_run_iterator_next_pos(itr, itr->iterator_type,
19,223!
1231
                                                &test_pos) == 0) {
1232
                        struct vy_entry test;
1233
                        if (vy_run_iterator_read(itr, test_pos, &test) != 0)
19,114!
1234
                                return -1;
×
1235
                        if (vy_stmt_lsn(test.stmt) > (**itr->read_view).vlsn ||
19,114!
1236
                            vy_stmt_flags(test.stmt) & VY_STMT_SKIP_READ ||
19,084!
1237
                            vy_entry_compare(itr->curr, test, cmp_def) != 0) {
19,011!
1238
                                tuple_unref(test.stmt);
19,052!
1239
                                break;
19,052✔
1240
                        }
1241
                        tuple_unref(itr->curr.stmt);
62!
1242
                        itr->curr = test;
62✔
1243
                        itr->curr_pos = test_pos;
62✔
1244
                }
1245
        }
1246
        /* Check if the result is within the slice boundaries. */
1247
        if (itr->iterator_type == ITER_LE || itr->iterator_type == ITER_LT) {
294,081✔
1248
                if (slice->begin.stmt != NULL &&
19,161✔
1249
                    vy_entry_compare(itr->curr, slice->begin, cmp_def) < 0) {
3,455✔
1250
                        vy_run_iterator_stop(itr);
42✔
1251
                        return 0;
42✔
1252
                }
1253
        } else {
1254
                assert(itr->iterator_type == ITER_GE ||
274,920!
1255
                       itr->iterator_type == ITER_GT ||
1256
                       itr->iterator_type == ITER_EQ);
1257
                if (slice->end.stmt != NULL &&
274,920✔
1258
                    vy_entry_compare(itr->curr, slice->end, cmp_def) >= 0) {
11,143✔
1259
                        vy_run_iterator_stop(itr);
44✔
1260
                        return 0;
44✔
1261
                }
1262
        }
1263
        vy_stmt_counter_acct_tuple(&itr->stat->get, itr->curr.stmt);
293,995✔
1264
        *ret = itr->curr;
293,995✔
1265
        return 0;
293,995✔
1266
}
1267

1268
/**
1269
 * Helper function for vy_run_iterator_seek().
1270
 *
1271
 * Positions the iterator to the beginning (i.e. leftmost for GE,
1272
 * rightmost for LE) of a series of statements matching the given
1273
 * search criteria.
1274
 *
1275
 * Updates itr->curr_pos. Doesn't affect itr->curr.
1276
 *
1277
 * @retval 0 success
1278
 * @retval 1 EOF
1279
 * @retval -1 read or memory error
1280
 */
1281
static NODISCARD int
1282
vy_run_iterator_do_seek(struct vy_run_iterator *itr,
83,846✔
1283
                        enum iterator_type iterator_type, struct vy_entry key)
1284
{
1285
        struct vy_run *run = itr->slice->run;
83,846✔
1286
        struct vy_run_iterator_pos end_pos = {run->info.page_count, 0};
83,846✔
1287
        bool equal_found = false;
83,846✔
1288
        if (!vy_stmt_is_empty_key(key.stmt)) {
83,846!
1289
                int rc = vy_run_iterator_search(itr, iterator_type, key,
82,807!
1290
                                                &itr->curr_pos, &equal_found);
1291
                if (rc != 0)
82,806✔
1292
                        return rc;
83,845✔
1293
        } else if (iterator_type == ITER_LE) {
1,039✔
1294
                itr->curr_pos = end_pos;
354✔
1295
        } else {
1296
                assert(iterator_type == ITER_GE);
685!
1297
                itr->curr_pos.page_no = 0;
685✔
1298
                itr->curr_pos.pos_in_page = 0;
685✔
1299
        }
1300
        if (iterator_type == ITER_EQ && !equal_found)
83,398✔
1301
                return 1;
15,802✔
1302
        if ((iterator_type == ITER_GE || iterator_type == ITER_GT) &&
67,596✔
1303
            itr->curr_pos.page_no == end_pos.page_no)
5,141✔
1304
                return 1;
1,225✔
1305
        if (iterator_type == ITER_LT || iterator_type == ITER_LE) {
66,371✔
1306
                /**
1307
                 * 1) in case of ITER_LT we now positioned on the value >= than
1308
                 * given, so we need to make a step on previous key
1309
                 * 2) in case if ITER_LE we now positioned on the value > than
1310
                 * given (special branch of code in vy_run_iterator_search),
1311
                 * so we need to make a step on previous key
1312
                 */
1313
                return vy_run_iterator_next_pos(itr, iterator_type,
2,613!
1314
                                                &itr->curr_pos);
1315
        } else {
1316
                assert(iterator_type == ITER_GE || iterator_type == ITER_GT ||
63,758!
1317
                       iterator_type == ITER_EQ);
1318
                /**
1319
                 * 1) in case of ITER_GT we now positioned on the value > than
1320
                 * given (special branch of code in vy_run_iterator_search),
1321
                 * so we need just to find proper lsn
1322
                 * 2) in case if ITER_GE or ITER_EQ we now positioned on the
1323
                 * value >= given, so we need just to find proper lsn
1324
                 */
1325
                return 0;
63,758✔
1326
        }
1327
}
1328

1329
/**
1330
 * Position the iterator to the first statement satisfying
1331
 * the iterator search criteria and following the given key
1332
 * (pass NULL to start iteration).
1333
 */
1334
static NODISCARD int
1335
vy_run_iterator_seek(struct vy_run_iterator *itr, struct vy_entry last,
444,925✔
1336
                     struct vy_entry *ret)
1337
{
1338
        struct key_def *cmp_def = itr->cmp_def;
444,925✔
1339
        struct vy_slice *slice = itr->slice;
444,925✔
1340
        struct tuple_bloom *bloom = slice->run->info.bloom;
444,925✔
1341
        struct vy_entry key = itr->key;
444,925✔
1342
        enum iterator_type iterator_type = itr->iterator_type;
444,925✔
1343

1344
        *ret = vy_entry_none();
444,925!
1345
        assert(itr->search_started);
444,925!
1346

1347
        /* Check the bloom filter on the first iteration. */
1348
        bool check_bloom = (itr->iterator_type == ITER_EQ &&
1,327,600✔
1349
                            itr->curr.stmt == NULL && bloom != NULL);
444,925✔
1350
        if (check_bloom && !vy_bloom_maybe_has(bloom, itr->key, itr->key_def)) {
444,925!
1351
                vy_run_iterator_stop(itr);
361,079!
1352
                itr->stat->bloom_hit++;
361,079✔
1353
                return 0;
444,923✔
1354
        }
1355

1356
        /*
1357
         * vy_run_iterator_do_seek() implements its own EQ check.
1358
         * We only need to check EQ here if iterator type and key
1359
         * passed to it differ from the original.
1360
         */
1361
        bool check_eq = false;
83,846✔
1362

1363
        /*
1364
         * Modify iterator type and key so as to position it to
1365
         * the first statement following the given key.
1366
         */
1367
        if (last.stmt != NULL) {
83,846✔
1368
                if (iterator_type == ITER_EQ)
3,761✔
1369
                        check_eq = true;
859✔
1370
                iterator_type = iterator_direction(iterator_type) > 0 ?
3,761!
1371
                                ITER_GT : ITER_LT;
3,761✔
1372
                key = last;
3,761✔
1373
        }
1374

1375
        /* Take slice boundaries into account. */
1376
        if (slice->begin.stmt != NULL &&
83,846✔
1377
            (iterator_type == ITER_GT || iterator_type == ITER_GE ||
16,232!
1378
             iterator_type == ITER_EQ)) {
1379
                /*
1380
                 *    original   |     start
1381
                 * --------------+-------+-----+
1382
                 *   KEY   | DIR |  KEY  | DIR |
1383
                 * --------+-----+-------+-----+
1384
                 * > begin | *   | key   | *   |
1385
                 * = begin | gt  | key   | gt  |
1386
                 *         | ge  | begin | ge  |
1387
                 *         | eq  | begin | ge  |
1388
                 * < begin | gt  | begin | ge  |
1389
                 *         | ge  | begin | ge  |
1390
                 *         | eq  |    stop     |
1391
                 */
1392
                int cmp = vy_entry_compare(key, slice->begin, cmp_def);
15,948!
1393
                if (cmp < 0 && iterator_type == ITER_EQ) {
15,948!
1394
                        vy_run_iterator_stop(itr);
×
1395
                        return 0;
×
1396
                }
1397
                if (cmp < 0 || (cmp == 0 && iterator_type != ITER_GT)) {
15,948!
1398
                        if (iterator_type == ITER_EQ)
213✔
1399
                                check_eq = true;
164✔
1400
                        iterator_type = ITER_GE;
213✔
1401
                        key = slice->begin;
213✔
1402
                }
1403
        }
1404
        if (slice->end.stmt != NULL &&
83,846✔
1405
            (iterator_type == ITER_LT || iterator_type == ITER_LE)) {
15,779✔
1406
                /*
1407
                 *    original   |     start
1408
                 * --------------+-------+-----+
1409
                 *   KEY   | DIR |  KEY  | DIR |
1410
                 * --------+-----+-------+-----+
1411
                 * < end   | *   | key   | *   |
1412
                 * = end   | lt  | key   | lt  |
1413
                 *         | le  | end   | lt  |
1414
                 * > end   | lt  | end   | lt  |
1415
                 *         | le  | end   | lt  |
1416
                 */
1417
                int cmp = vy_entry_compare(key, slice->end, cmp_def);
482!
1418
                if (cmp > 0 || (cmp == 0 && iterator_type != ITER_LT)) {
482!
1419
                        iterator_type = ITER_LT;
×
1420
                        key = slice->end;
×
1421
                }
1422
        }
1423

1424
        /* Perform a lookup in the run. */
1425
        itr->stat->lookup++;
83,846✔
1426
        int rc = vy_run_iterator_do_seek(itr, iterator_type, key);
83,846!
1427
        if (rc < 0)
83,845!
1428
                return -1;
×
1429
        if (rc > 0)
83,845✔
1430
                goto not_found;
17,474✔
1431

1432
        /* Load the found statement. */
1433
        if (itr->curr.stmt != NULL) {
66,371✔
1434
                tuple_unref(itr->curr.stmt);
1,062!
1435
                itr->curr = vy_entry_none();
1,062!
1436
        }
1437
        if (vy_run_iterator_read(itr, itr->curr_pos, &itr->curr) != 0)
66,371!
1438
                return -1;
7✔
1439

1440
        /* Check EQ constraint if necessary. */
1441
        if (check_eq && vy_entry_compare(itr->curr, itr->key,
66,363!
1442
                                         itr->cmp_def) != 0)
1443
                goto not_found;
135✔
1444

1445
        /* Skip statements invisible from the iterator read view. */
1446
        return vy_run_iterator_find_lsn(itr, ret);
66,228!
1447

1448
not_found:
17,609✔
1449
        if (check_bloom)
17,609✔
1450
                itr->stat->bloom_miss++;
15,899✔
1451
        vy_run_iterator_stop(itr);
17,609!
1452
        return 0;
17,609✔
1453
}
1454

1455
/* }}} vy_run_iterator vy_run_iterator support functions */
1456

1457
/* {{{ vy_run_iterator API implementation */
1458

1459
void
1460
vy_run_iterator_open(struct vy_run_iterator *itr,
610,191✔
1461
                     struct vy_run_iterator_stat *stat,
1462
                     struct vy_slice *slice, enum iterator_type iterator_type,
1463
                     struct vy_entry key, const struct vy_read_view **rv,
1464
                     struct key_def *cmp_def, struct key_def *key_def,
1465
                     struct tuple_format *format)
1466
{
1467
        itr->stat = stat;
610,191✔
1468
        itr->cmp_def = cmp_def;
610,191✔
1469
        itr->key_def = key_def;
610,191✔
1470
        itr->format = format;
610,191✔
1471
        itr->slice = slice;
610,191✔
1472

1473
        itr->iterator_type = iterator_type;
610,191✔
1474
        itr->key = key;
610,191✔
1475
        itr->read_view = rv;
610,191✔
1476

1477
        itr->curr = vy_entry_none();
610,191✔
1478
        itr->curr_pos.page_no = slice->run->info.page_count;
610,191✔
1479
        itr->curr_page = NULL;
610,191✔
1480
        itr->prev_page = NULL;
610,191✔
1481
        itr->search_started = false;
610,191✔
1482

1483
        /*
1484
         * Make sure the format we use to create tuples won't
1485
         * go away if DDL is called while the iterator is used.
1486
         *
1487
         * XXX: Please remove this kludge when proper DDL locking
1488
         * is implemented on transaction management level or multi
1489
         * version data dictionary is in place.
1490
         */
1491
        tuple_format_ref(format);
610,191✔
1492
}
610,191✔
1493

1494
/**
1495
 * Advance a run iterator to the newest statement for the next key.
1496
 * The statement is returned in @ret (NULL if EOF).
1497
 * Returns 0 on success, -1 on memory allocation or IO error.
1498
 */
1499
static NODISCARD int
1500
vy_run_iterator_next_key(struct vy_run_iterator *itr, struct vy_entry *ret)
645,148✔
1501
{
1502
        *ret = vy_entry_none();
645,148!
1503

1504
        if (!itr->search_started) {
645,148✔
1505
                itr->search_started = true;
410,343✔
1506
                return vy_run_iterator_seek(itr, vy_entry_none(), ret);
645,148!
1507
        }
1508
        if (itr->curr.stmt == NULL)
234,805✔
1509
                return 0;
10✔
1510

1511
        assert(itr->curr_pos.page_no < itr->slice->run->info.page_count);
234,795!
1512

1513
        struct vy_entry next = vy_entry_none();
234,795!
1514
        do {
1515
                if (next.stmt != NULL)
234,845✔
1516
                        tuple_unref(next.stmt);
50!
1517
                if (vy_run_iterator_next_pos(itr, itr->iterator_type,
234,845!
1518
                                             &itr->curr_pos) != 0) {
1519
                        vy_run_iterator_stop(itr);
1,107!
1520
                        return 0;
1,107✔
1521
                }
1522

1523
                if (vy_run_iterator_read(itr, itr->curr_pos, &next) != 0)
233,738!
1524
                        return -1;
×
1525
        } while (vy_entry_compare(itr->curr, next, itr->cmp_def) == 0);
233,738!
1526

1527
        tuple_unref(itr->curr.stmt);
233,688!
1528
        itr->curr = next;
233,688✔
1529

1530
        if (itr->iterator_type == ITER_EQ &&
233,688✔
1531
            vy_entry_compare(next, itr->key, itr->cmp_def) != 0) {
24,309!
1532
                vy_run_iterator_stop(itr);
5,786!
1533
                return 0;
5,786✔
1534
        }
1535
        return vy_run_iterator_find_lsn(itr, ret);
227,902!
1536
}
1537

1538
/**
1539
 * Advance a run iterator to the next (older) statement for the
1540
 * current key. The statement is returned in @ret (NULL if EOF).
1541
 * Returns 0 on success, -1 on memory allocation or IO error.
1542
 */
1543
static NODISCARD int
1544
vy_run_iterator_next_lsn(struct vy_run_iterator *itr, struct vy_entry *ret)
4,016✔
1545
{
1546
        *ret = vy_entry_none();
4,016!
1547

1548
        assert(itr->search_started);
4,016!
1549
        assert(itr->curr.stmt != NULL);
4,016!
1550
        assert(itr->curr_pos.page_no < itr->slice->run->info.page_count);
4,016!
1551

1552
        struct vy_run_iterator_pos next_pos;
1553
next:
4,016✔
1554
        if (vy_run_iterator_next_pos(itr, ITER_GE, &next_pos) != 0) {
4,016!
1555
                vy_run_iterator_stop(itr);
13!
1556
                return 0;
4,016✔
1557
        }
1558

1559
        struct vy_entry next;
1560
        if (vy_run_iterator_read(itr, next_pos, &next) != 0)
4,003!
1561
                return -1;
×
1562

1563
        if (vy_entry_compare(itr->curr, next, itr->cmp_def) != 0) {
4,003!
1564
                tuple_unref(next.stmt);
4,003!
1565
                return 0;
4,003✔
1566
        }
1567

1568
        tuple_unref(itr->curr.stmt);
×
1569
        itr->curr = next;
×
1570
        itr->curr_pos = next_pos;
×
1571
        if (vy_stmt_flags(itr->curr.stmt) & VY_STMT_SKIP_READ)
×
1572
                goto next;
×
1573

1574
        vy_stmt_counter_acct_tuple(&itr->stat->get, itr->curr.stmt);
×
1575
        *ret = itr->curr;
×
1576
        return 0;
×
1577
}
1578

1579
NODISCARD int
1580
vy_run_iterator_next(struct vy_run_iterator *itr,
645,148✔
1581
                     struct vy_history *history)
1582
{
1583
        vy_history_cleanup(history);
645,148!
1584
        struct vy_entry entry;
1585
        if (vy_run_iterator_next_key(itr, &entry) != 0)
645,148!
1586
                return -1;
645,147✔
1587
        while (entry.stmt != NULL) {
649,153✔
1588
                if (vy_history_append_stmt(history, entry) != 0)
279,427!
1589
                        return -1;
×
1590
                if (vy_history_is_terminal(history))
279,427!
1591
                        break;
275,421✔
1592
                if (vy_run_iterator_next_lsn(itr, &entry) != 0)
4,006!
1593
                        return -1;
×
1594
        }
1595
        return 0;
645,147✔
1596
}
1597

1598
NODISCARD int
1599
vy_run_iterator_skip(struct vy_run_iterator *itr, struct vy_entry last,
35,479✔
1600
                     struct vy_history *history)
1601
{
1602
        /*
1603
         * Check if the iterator is already positioned
1604
         * at the statement following last.
1605
         */
1606
        if (itr->search_started &&
35,479✔
1607
            (itr->curr.stmt == NULL || last.stmt == NULL ||
1,985!
1608
             iterator_direction(itr->iterator_type) *
1,699!
1609
             vy_entry_compare(itr->curr, last, itr->cmp_def) > 0))
1,699!
1610
                return 0;
35,478✔
1611

1612
        vy_history_cleanup(history);
34,582!
1613

1614
        itr->search_started = true;
34,582✔
1615
        struct vy_entry entry;
1616
        if (vy_run_iterator_seek(itr, last, &entry) != 0)
34,582!
1617
                return -1;
7✔
1618

1619
        while (entry.stmt != NULL) {
34,584✔
1620
                if (vy_history_append_stmt(history, entry) != 0)
14,568!
1621
                        return -1;
×
1622
                if (vy_history_is_terminal(history))
14,568!
1623
                        break;
14,558✔
1624
                if (vy_run_iterator_next_lsn(itr, &entry) != 0)
10!
1625
                        return -1;
×
1626
        }
1627
        return 0;
34,574✔
1628
}
1629

1630
void
1631
vy_run_iterator_close(struct vy_run_iterator *itr)
610,187✔
1632
{
1633
        vy_run_iterator_stop(itr);
610,187✔
1634
        tuple_format_unref(itr->format);
610,187✔
1635
        TRASH(itr);
610,187✔
1636
}
610,187✔
1637

1638
/* }}} vy_run_iterator API implementation */
1639

1640
/** Account a page to run statistics. */
1641
static void
1642
vy_run_acct_page(struct vy_run *run, struct vy_page_info *page)
157,130✔
1643
{
1644
        const char *min_key_end = page->min_key;
157,130✔
1645
        mp_next(&min_key_end);
157,130!
1646
        run->page_index_size += sizeof(struct vy_page_info);
157,130✔
1647
        run->page_index_size += min_key_end - page->min_key;
157,130✔
1648
        run->count.rows += page->row_count;
157,130✔
1649
        run->count.bytes += page->unpacked_size;
157,130✔
1650
        run->count.bytes_compressed += page->size;
157,130✔
1651
        run->count.pages++;
157,130✔
1652
}
157,130✔
1653

1654
int
1655
vy_run_recover(struct vy_run *run, const char *dir,
283✔
1656
               uint32_t space_id, uint32_t iid, struct key_def *cmp_def)
1657
{
1658
        char path[PATH_MAX];
1659
        vy_run_snprint_path(path, sizeof(path), dir,
283!
1660
                            space_id, iid, run->id, VY_FILE_INDEX);
1661

1662
        struct xlog_cursor cursor;
1663
        ERROR_INJECT_COUNTDOWN(ERRINJ_VY_RUN_OPEN, {
283!
1664
                diag_set(SystemError, "failed to open '%s' file", path);
1665
                goto fail;
1666
        });
1667
        if (xlog_cursor_open(&cursor, path))
282!
1668
                goto fail;
5✔
1669

1670
        struct xlog_meta *meta = &cursor.meta;
277✔
1671
        if (strcmp(meta->filetype, XLOG_META_TYPE_INDEX) != 0) {
277!
1672
                diag_set(ClientError, ER_INVALID_XLOG_TYPE,
×
1673
                         XLOG_META_TYPE_INDEX, meta->filetype);
1674
                goto fail_close;
×
1675
        }
1676

1677
        /* Read run header. */
1678
        struct xrow_header xrow;
1679
        /* all rows should be in one tx */
1680
        int rc = xlog_cursor_next_tx(&cursor);
277!
1681

1682
        if (rc != 0) {
277!
1683
                if (rc > 0)
×
1684
                        diag_set(ClientError, ER_INVALID_INDEX_FILE,
×
1685
                                 path, "Unexpected end of file");
1686
                goto fail_close;
×
1687
        }
1688
        rc = xlog_cursor_next_row(&cursor, &xrow);
277!
1689
        if (rc != 0) {
277!
1690
                if (rc > 0)
×
1691
                        diag_set(ClientError, ER_INVALID_INDEX_FILE,
×
1692
                                 path, "Unexpected end of file");
1693
                goto fail_close;
×
1694
        }
1695

1696
        if (xrow.type != VY_INDEX_RUN_INFO) {
277!
1697
                diag_set(ClientError, ER_INVALID_INDEX_FILE, path,
×
1698
                         tt_sprintf("Wrong xrow type (expected %d, got %u)",
1699
                                    VY_INDEX_RUN_INFO, (unsigned)xrow.type));
1700
                goto fail_close;
×
1701
        }
1702

1703
        if (vy_run_info_decode(&run->info, &xrow, path) != 0)
277!
1704
                goto fail_close;
×
1705

1706
        /* Allocate buffer for page info. */
1707
        run->page_info = calloc(run->info.page_count,
277✔
1708
                                      sizeof(struct vy_page_info));
1709
        if (run->page_info == NULL) {
277!
1710
                diag_set(OutOfMemory,
×
1711
                         run->info.page_count * sizeof(struct vy_page_info),
1712
                         "malloc", "struct vy_page_info");
1713
                goto fail_close;
×
1714
        }
1715

1716
        for (uint32_t page_no = 0; page_no < run->info.page_count; page_no++) {
16,541✔
1717
                int rc = xlog_cursor_next_row(&cursor, &xrow);
16,264!
1718
                if (rc != 0) {
16,264!
1719
                        if (rc > 0) {
×
1720
                                /** To few pages in file */
1721
                                diag_set(ClientError, ER_INVALID_INDEX_FILE,
×
1722
                                         path, "Unexpected end of file");
1723
                        }
1724
                        /*
1725
                         * Limit the count of pages to
1726
                         * successfully created pages.
1727
                         */
1728
                        run->info.page_count = page_no;
×
1729
                        goto fail_close;
×
1730
                }
1731
                if (xrow.type != VY_INDEX_PAGE_INFO) {
16,264!
1732
                        diag_set(ClientError, ER_INVALID_INDEX_FILE,
×
1733
                                 tt_sprintf("Wrong xrow type "
1734
                                            "(expected %d, got %u)",
1735
                                            VY_INDEX_PAGE_INFO,
1736
                                            (unsigned)xrow.type));
1737
                        goto fail_close;
×
1738
                }
1739
                struct vy_page_info *page = run->page_info + page_no;
16,264✔
1740
                if (vy_page_info_decode(page, &xrow, cmp_def, path) < 0) {
16,264!
1741
                        /**
1742
                         * Limit the count of pages to successfully
1743
                         * created pages
1744
                         */
1745
                        run->info.page_count = page_no;
×
1746
                        goto fail_close;
×
1747
                }
1748
                vy_run_acct_page(run, page);
16,264!
1749
        }
1750

1751
        /* We don't need to keep metadata file open any longer. */
1752
        xlog_cursor_close(&cursor, false);
277!
1753

1754
        /* Prepare data file for reading. */
1755
        vy_run_snprint_path(path, sizeof(path), dir,
277!
1756
                            space_id, iid, run->id, VY_FILE_RUN);
1757
        if (xlog_cursor_open(&cursor, path))
277!
1758
                goto fail;
×
1759
        meta = &cursor.meta;
277✔
1760
        if (strcmp(meta->filetype, XLOG_META_TYPE_RUN) != 0) {
277!
1761
                diag_set(ClientError, ER_INVALID_XLOG_TYPE,
×
1762
                         XLOG_META_TYPE_RUN, meta->filetype);
1763
                goto fail_close;
×
1764
        }
1765
        run->fd = cursor.fd;
277✔
1766
        xlog_cursor_close(&cursor, true);
277!
1767
        return 0;
283✔
1768

1769
fail_close:
×
1770
        xlog_cursor_close(&cursor, false);
×
1771
fail:
6✔
1772
        vy_run_clear(run);
6!
1773
        diag_log();
6!
1774
        say_error("failed to load `%s'", path);
6!
1775
        return -1;
6✔
1776
}
1777

1778
/* dump statement to the run page buffers (stmt header and data) */
1779
static int
1780
vy_run_dump_stmt(struct vy_entry entry, struct xlog *data_xlog,
5,523,600✔
1781
                 struct vy_page_info *info, struct key_def *key_def,
1782
                 bool is_primary)
1783
{
1784
        struct xrow_header xrow;
1785
        int rc = (is_primary ?
5,523,600✔
1786
                  vy_stmt_encode_primary(entry.stmt, key_def, 0, &xrow) :
5,523,600!
1787
                  vy_stmt_encode_secondary(entry.stmt, key_def,
4,831,340!
1788
                                           vy_entry_multikey_idx(entry, key_def),
1789
                                           &xrow));
1790
        if (rc != 0)
5,523,600!
1791
                return -1;
5,523,600✔
1792

1793
        ssize_t row_size;
1794
        if ((row_size = xlog_write_row(data_xlog, &xrow)) < 0)
5,523,600!
1795
                return -1;
×
1796

1797
        info->unpacked_size += row_size;
5,523,600✔
1798
        info->row_count++;
5,523,600✔
1799
        return 0;
5,523,600✔
1800
}
1801

1802
/**
1803
 * Encode uint32_t array of row offsets (row index) as xrow
1804
 *
1805
 * @param row_index row index
1806
 * @param row_count size of row index
1807
 * @param[out] xrow xrow to fill.
1808
 * @retval 0 for success
1809
 * @retval -1 for error
1810
 */
1811
static int
1812
vy_row_index_encode(const uint32_t *row_index, uint32_t row_count,
140,200✔
1813
                    struct xrow_header *xrow)
1814
{
1815
        memset(xrow, 0, sizeof(*xrow));
140,200✔
1816
        xrow->type = VY_RUN_ROW_INDEX;
140,200✔
1817

1818
        size_t size = mp_sizeof_map(1) +
140,200✔
1819
                      mp_sizeof_uint(VY_ROW_INDEX_DATA) +
140,200✔
1820
                      mp_sizeof_bin(sizeof(uint32_t) * row_count);
140,200✔
1821
        char *pos = region_alloc(&fiber()->gc, size);
140,200!
1822
        if (pos == NULL) {
140,200!
1823
                diag_set(OutOfMemory, size, "region", "row index");
×
1824
                return -1;
×
1825
        }
1826
        xrow->body->iov_base = pos;
140,200✔
1827
        pos = mp_encode_map(pos, 1);
140,200✔
1828
        pos = mp_encode_uint(pos, VY_ROW_INDEX_DATA);
140,200✔
1829
        pos = mp_encode_binl(pos, sizeof(uint32_t) * row_count);
140,200✔
1830
        for (uint32_t i = 0; i < row_count; ++i)
5,663,460✔
1831
                pos = mp_store_u32(pos, row_index[i]);
5,523,260✔
1832
        xrow->body->iov_len = (void *)pos - xrow->body->iov_base;
140,200✔
1833
        assert(xrow->body->iov_len == size);
140,200!
1834
        xrow->bodycnt = 1;
140,200✔
1835
        return 0;
140,200✔
1836
}
1837

1838
/**
1839
 * Helper to extend run page info array
1840
 */
1841
static inline int
1842
vy_run_alloc_page_info(struct vy_run *run, uint32_t *page_info_capacity)
6,150✔
1843
{
1844
        uint32_t cap = *page_info_capacity > 0 ?
12,300✔
1845
                       *page_info_capacity * 2 : 16;
6,150✔
1846
        struct vy_page_info *page_info = realloc(run->page_info,
6,150✔
1847
                                        cap * sizeof(*page_info));
1848
        if (page_info == NULL) {
6,150!
1849
                diag_set(OutOfMemory, cap * sizeof(*page_info),
×
1850
                         "realloc", "struct vy_page_info");
1851
                return -1;
×
1852
        }
1853
        run->page_info = page_info;
6,150✔
1854
        *page_info_capacity = cap;
6,150✔
1855
        return 0;
6,150✔
1856
}
1857

1858
/** {{{ vy_page_info */
1859

1860
/**
1861
 * Encode vy_page_info as xrow.
1862
 * Allocates using region_alloc.
1863
 *
1864
 * @param page_info page information to encode
1865
 * @param[out] xrow xrow to fill
1866
 *
1867
 * @retval  0 success
1868
 * @retval -1 error, check diag
1869
 */
1870
static int
1871
vy_page_info_encode(const struct vy_page_info *page_info,
140,313✔
1872
                    struct xrow_header *xrow)
1873
{
1874
        struct region *region = &fiber()->gc;
140,313!
1875

1876
        uint32_t min_key_size;
1877
        const char *tmp = page_info->min_key;
140,313✔
1878
        assert(mp_typeof(*tmp) == MP_ARRAY);
280,626!
1879
        mp_next(&tmp);
140,313!
1880
        min_key_size = tmp - page_info->min_key;
140,313✔
1881

1882
        /* calc tuple size */
1883
        uint32_t size;
1884
        /* 3 items: page offset, size, and map */
1885
        size = mp_sizeof_map(6) +
140,313!
1886
               mp_sizeof_uint(VY_PAGE_INFO_OFFSET) +
140,313!
1887
               mp_sizeof_uint(page_info->offset) +
140,313!
1888
               mp_sizeof_uint(VY_PAGE_INFO_SIZE) +
140,313!
1889
               mp_sizeof_uint(page_info->size) +
140,313!
1890
               mp_sizeof_uint(VY_PAGE_INFO_ROW_COUNT) +
140,313!
1891
               mp_sizeof_uint(page_info->row_count) +
140,313!
1892
               mp_sizeof_uint(VY_PAGE_INFO_MIN_KEY) +
140,313!
1893
               min_key_size +
140,313✔
1894
               mp_sizeof_uint(VY_PAGE_INFO_UNPACKED_SIZE) +
140,313!
1895
               mp_sizeof_uint(page_info->unpacked_size) +
140,313!
1896
               mp_sizeof_uint(VY_PAGE_INFO_ROW_INDEX_OFFSET) +
140,313!
1897
               mp_sizeof_uint(page_info->row_index_offset);
140,313!
1898

1899
        char *pos = region_alloc(region, size);
140,313!
1900
        if (pos == NULL) {
140,313!
1901
                diag_set(OutOfMemory, size, "region", "page encode");
×
1902
                return -1;
140,313✔
1903
        }
1904

1905
        memset(xrow, 0, sizeof(*xrow));
140,313✔
1906
        /* encode page */
1907
        xrow->body->iov_base = pos;
140,313✔
1908
        pos = mp_encode_map(pos, 6);
140,313!
1909
        pos = mp_encode_uint(pos, VY_PAGE_INFO_OFFSET);
140,313!
1910
        pos = mp_encode_uint(pos, page_info->offset);
140,313!
1911
        pos = mp_encode_uint(pos, VY_PAGE_INFO_SIZE);
140,313!
1912
        pos = mp_encode_uint(pos, page_info->size);
140,313!
1913
        pos = mp_encode_uint(pos, VY_PAGE_INFO_ROW_COUNT);
140,313!
1914
        pos = mp_encode_uint(pos, page_info->row_count);
140,313!
1915
        pos = mp_encode_uint(pos, VY_PAGE_INFO_MIN_KEY);
140,313!
1916
        memcpy(pos, page_info->min_key, min_key_size);
140,313✔
1917
        pos += min_key_size;
140,313✔
1918
        pos = mp_encode_uint(pos, VY_PAGE_INFO_UNPACKED_SIZE);
140,313!
1919
        pos = mp_encode_uint(pos, page_info->unpacked_size);
140,313!
1920
        pos = mp_encode_uint(pos, VY_PAGE_INFO_ROW_INDEX_OFFSET);
140,313!
1921
        pos = mp_encode_uint(pos, page_info->row_index_offset);
140,313!
1922
        xrow->body->iov_len = (void *)pos - xrow->body->iov_base;
140,313✔
1923
        xrow->bodycnt = 1;
140,313✔
1924

1925
        xrow->type = VY_INDEX_PAGE_INFO;
140,313✔
1926
        return 0;
140,313✔
1927
}
1928

1929
/** vy_page_info }}} */
1930

1931
/** {{{ vy_run_info */
1932

1933
/** Return the size of encoded statement statistics. */
1934
static size_t
1935
vy_stmt_stat_sizeof(const struct vy_stmt_stat *stat)
3,178✔
1936
{
1937
        return mp_sizeof_map(4) +
3,178✔
1938
                mp_sizeof_uint(IPROTO_INSERT) +
3,178✔
1939
                mp_sizeof_uint(IPROTO_REPLACE) +
3,178✔
1940
                mp_sizeof_uint(IPROTO_DELETE) +
3,178✔
1941
                mp_sizeof_uint(IPROTO_UPSERT) +
3,178✔
1942
                mp_sizeof_uint(stat->inserts) +
3,178✔
1943
                mp_sizeof_uint(stat->replaces) +
3,178✔
1944
                mp_sizeof_uint(stat->deletes) +
3,178✔
1945
                mp_sizeof_uint(stat->upserts);
3,178✔
1946
}
1947

1948
/** Encode statement statistics to @buf and return advanced @buf. */
1949
static char *
1950
vy_stmt_stat_encode(const struct vy_stmt_stat *stat, char *buf)
3,178✔
1951
{
1952
        buf = mp_encode_map(buf, 4);
3,178✔
1953
        buf = mp_encode_uint(buf, IPROTO_INSERT);
3,178✔
1954
        buf = mp_encode_uint(buf, stat->inserts);
3,178✔
1955
        buf = mp_encode_uint(buf, IPROTO_REPLACE);
3,178✔
1956
        buf = mp_encode_uint(buf, stat->replaces);
3,178✔
1957
        buf = mp_encode_uint(buf, IPROTO_DELETE);
3,178✔
1958
        buf = mp_encode_uint(buf, stat->deletes);
3,178✔
1959
        buf = mp_encode_uint(buf, IPROTO_UPSERT);
3,178✔
1960
        buf = mp_encode_uint(buf, stat->upserts);
3,178✔
1961
        return buf;
3,178✔
1962
}
1963

1964
/**
1965
 * Encode vy_run_info as xrow
1966
 * Allocates using region alloc
1967
 *
1968
 * @param run_info the run information
1969
 * @param xrow xrow to fill.
1970
 *
1971
 * @retval  0 success
1972
 * @retval -1 on error, check diag
1973
 */
1974
static int
1975
vy_run_info_encode(const struct vy_run_info *run_info,
3,178✔
1976
                   struct xrow_header *xrow)
1977
{
1978
        const char *tmp;
1979
        tmp = run_info->min_key;
3,178✔
1980
        mp_next(&tmp);
3,178!
1981
        size_t min_key_size = tmp - run_info->min_key;
3,178✔
1982
        tmp = run_info->max_key;
3,178✔
1983
        mp_next(&tmp);
3,178!
1984
        size_t max_key_size = tmp - run_info->max_key;
3,178✔
1985

1986
        uint32_t key_count = 6;
3,178✔
1987
        if (run_info->bloom != NULL)
3,178✔
1988
                key_count++;
3,176✔
1989

1990
        size_t size = mp_sizeof_map(key_count);
3,178!
1991
        size += mp_sizeof_uint(VY_RUN_INFO_MIN_KEY) + min_key_size;
3,178!
1992
        size += mp_sizeof_uint(VY_RUN_INFO_MAX_KEY) + max_key_size;
3,178!
1993
        size += mp_sizeof_uint(VY_RUN_INFO_MIN_LSN) +
3,178!
1994
                mp_sizeof_uint(run_info->min_lsn);
3,178!
1995
        size += mp_sizeof_uint(VY_RUN_INFO_MAX_LSN) +
3,178!
1996
                mp_sizeof_uint(run_info->max_lsn);
3,178!
1997
        size += mp_sizeof_uint(VY_RUN_INFO_PAGE_COUNT) +
3,178!
1998
                mp_sizeof_uint(run_info->page_count);
3,178!
1999
        if (run_info->bloom != NULL)
3,178✔
2000
                size += mp_sizeof_uint(VY_RUN_INFO_BLOOM_FILTER) +
3,176!
2001
                        tuple_bloom_size(run_info->bloom);
3,176!
2002
        size += mp_sizeof_uint(VY_RUN_INFO_STMT_STAT) +
3,178!
2003
                vy_stmt_stat_sizeof(&run_info->stmt_stat);
3,178!
2004

2005
        char *pos = region_alloc(&fiber()->gc, size);
3,178!
2006
        if (pos == NULL) {
3,178!
2007
                diag_set(OutOfMemory, size, "region", "run encode");
×
2008
                return -1;
3,178✔
2009
        }
2010
        memset(xrow, 0, sizeof(*xrow));
3,178✔
2011
        xrow->body->iov_base = pos;
3,178✔
2012
        /* encode values */
2013
        pos = mp_encode_map(pos, key_count);
3,178!
2014
        pos = mp_encode_uint(pos, VY_RUN_INFO_MIN_KEY);
3,178!
2015
        memcpy(pos, run_info->min_key, min_key_size);
3,178✔
2016
        pos += min_key_size;
3,178✔
2017
        pos = mp_encode_uint(pos, VY_RUN_INFO_MAX_KEY);
3,178!
2018
        memcpy(pos, run_info->max_key, max_key_size);
3,178✔
2019
        pos += max_key_size;
3,178✔
2020
        pos = mp_encode_uint(pos, VY_RUN_INFO_MIN_LSN);
3,178!
2021
        pos = mp_encode_uint(pos, run_info->min_lsn);
3,178!
2022
        pos = mp_encode_uint(pos, VY_RUN_INFO_MAX_LSN);
3,178!
2023
        pos = mp_encode_uint(pos, run_info->max_lsn);
3,178!
2024
        pos = mp_encode_uint(pos, VY_RUN_INFO_PAGE_COUNT);
3,178!
2025
        pos = mp_encode_uint(pos, run_info->page_count);
3,178!
2026
        if (run_info->bloom != NULL) {
3,178✔
2027
                pos = mp_encode_uint(pos, VY_RUN_INFO_BLOOM_FILTER);
3,176!
2028
                pos = tuple_bloom_encode(run_info->bloom, pos);
3,176!
2029
        }
2030
        pos = mp_encode_uint(pos, VY_RUN_INFO_STMT_STAT);
3,178!
2031
        pos = vy_stmt_stat_encode(&run_info->stmt_stat, pos);
3,178!
2032
        xrow->body->iov_len = (void *)pos - xrow->body->iov_base;
3,178✔
2033
        xrow->bodycnt = 1;
3,178✔
2034
        xrow->type = VY_INDEX_RUN_INFO;
3,178✔
2035
        return 0;
3,178✔
2036
}
2037

2038
/* vy_run_info }}} */
2039

2040
/**
2041
 * Write run index to file.
2042
 */
2043
static int
2044
vy_run_write_index(struct vy_run *run, const char *dirpath,
3,178✔
2045
                   uint32_t space_id, uint32_t iid)
2046
{
2047
        char path[PATH_MAX];
2048
        vy_run_snprint_path(path, sizeof(path), dirpath,
3,178!
2049
                            space_id, iid, run->id, VY_FILE_INDEX);
2050

2051
        say_info("writing `%s'", path);
3,178!
2052

2053
        struct xlog index_xlog;
2054
        struct xlog_meta meta;
2055
        xlog_meta_create(&meta, XLOG_META_TYPE_INDEX, &INSTANCE_UUID,
3,178!
2056
                         NULL, NULL);
2057
        struct xlog_opts opts = xlog_opts_default;
3,178✔
2058
        opts.rate_limit = run->env->snap_io_rate_limit;
3,178✔
2059
        opts.sync_interval = VY_RUN_SYNC_INTERVAL;
3,178✔
2060
        if (xlog_create(&index_xlog, path, 0, &meta, &opts) < 0)
3,178!
2061
                return -1;
3,176✔
2062

2063
        xlog_tx_begin(&index_xlog);
3,178!
2064
        struct region *region = &fiber()->gc;
3,178!
2065
        size_t mem_used = region_used(region);
3,178!
2066

2067
        struct xrow_header xrow;
2068
        if (vy_run_info_encode(&run->info, &xrow) != 0 ||
3,178!
2069
            xlog_write_row(&index_xlog, &xrow) < 0)
3,178!
2070
                goto fail_rollback;
×
2071

2072
        for (uint32_t page_no = 0; page_no < run->info.page_count; ++page_no) {
143,491✔
2073
                struct vy_page_info *page_info = vy_run_page_info(run, page_no);
140,313!
2074
                if (vy_page_info_encode(page_info, &xrow) < 0) {
140,313!
2075
                        goto fail_rollback;
×
2076
                }
2077
                if (xlog_write_row(&index_xlog, &xrow) < 0)
140,313!
2078
                        goto fail_rollback;
×
2079
        }
2080

2081
        region_truncate(region, mem_used);
3,178!
2082
        if (xlog_tx_commit(&index_xlog) < 0)
3,178!
2083
                goto fail;
×
2084

2085
        ERROR_INJECT(ERRINJ_VY_INDEX_FILE_RENAME, {
3,178!
2086
                diag_set(ClientError, ER_INJECTION, "vinyl index file rename");
2087
                xlog_close(&index_xlog, false);
2088
                return -1;
2089
        });
2090

2091
        if (xlog_flush(&index_xlog) < 0 ||
3,177!
2092
            xlog_rename(&index_xlog) < 0)
3,176!
2093
                goto fail;
×
2094

2095
        xlog_close(&index_xlog, false);
3,176✔
2096
        return 0;
3,175✔
2097

2098
fail_rollback:
×
2099
        region_truncate(region, mem_used);
×
2100
        xlog_tx_rollback(&index_xlog);
×
2101
fail:
×
2102
        xlog_close(&index_xlog, false);
×
2103
        unlink(path);
×
2104
        return -1;
×
2105
}
2106

2107
int
2108
vy_run_writer_create(struct vy_run_writer *writer, struct vy_run *run,
3,271✔
2109
                     const char *dirpath, uint32_t space_id, uint32_t iid,
2110
                     struct key_def *cmp_def, struct key_def *key_def,
2111
                     uint64_t page_size, double bloom_fpr, bool no_compression)
2112
{
2113
        memset(writer, 0, sizeof(*writer));
3,271✔
2114
        writer->run = run;
3,271✔
2115
        writer->dirpath = dirpath;
3,271✔
2116
        writer->space_id = space_id;
3,271✔
2117
        writer->iid = iid;
3,271✔
2118
        writer->cmp_def = cmp_def;
3,271✔
2119
        writer->key_def = key_def;
3,271✔
2120
        writer->page_size = page_size;
3,271✔
2121
        writer->bloom_fpr = bloom_fpr;
3,271✔
2122
        writer->no_compression = no_compression;
3,271✔
2123
        if (bloom_fpr < 1) {
3,271✔
2124
                writer->bloom = tuple_bloom_builder_new(key_def->part_count);
3,269✔
2125
                if (writer->bloom == NULL)
3,269!
2126
                        return -1;
×
2127
        }
2128
        xlog_clear(&writer->data_xlog);
3,271✔
2129
        ibuf_create(&writer->row_index_buf, &cord()->slabc,
3,271!
2130
                    4096 * sizeof(uint32_t));
2131
        run->info.min_lsn = INT64_MAX;
3,271✔
2132
        run->info.max_lsn = -1;
3,271✔
2133
        assert(run->page_info == NULL);
3,271!
2134
        return 0;
3,271✔
2135
}
2136

2137
/**
2138
 * Create an xlog to write run.
2139
 * @param writer Run writer.
2140
 * @retval -1 Memory or IO error.
2141
 * @retval  0 Success.
2142
 */
2143
static int
2144
vy_run_writer_create_xlog(struct vy_run_writer *writer)
3,181✔
2145
{
2146
        assert(!xlog_is_open(&writer->data_xlog));
3,181!
2147
        char path[PATH_MAX];
2148
        vy_run_snprint_path(path, sizeof(path), writer->dirpath,
3,181✔
2149
                            writer->space_id, writer->iid, writer->run->id,
3,181!
2150
                            VY_FILE_RUN);
2151
        say_info("writing `%s'", path);
3,181!
2152
        struct xlog_meta meta;
2153
        xlog_meta_create(&meta, XLOG_META_TYPE_RUN, &INSTANCE_UUID,
3,181!
2154
                         NULL, NULL);
2155
        struct xlog_opts opts = xlog_opts_default;
3,181✔
2156
        opts.rate_limit = writer->run->env->snap_io_rate_limit;
3,181✔
2157
        opts.sync_interval = VY_RUN_SYNC_INTERVAL;
3,181✔
2158
        opts.no_compression = writer->no_compression;
3,181✔
2159
        if (xlog_create(&writer->data_xlog, path, 0, &meta, &opts) != 0)
3,181!
2160
                return -1;
3,181✔
2161
        return 0;
3,181✔
2162
}
2163

2164
/**
2165
 * Start a new page with a min_key stored in @a first_entry.
2166
 * @param writer Run writer.
2167
 * @param first_entry First statement of a page.
2168
 *
2169
 * @retval -1 Memory error.
2170
 * @retval  0 Success.
2171
 */
2172
static int
2173
vy_run_writer_start_page(struct vy_run_writer *writer,
140,203✔
2174
                         struct vy_entry first_entry)
2175
{
2176
        struct vy_run *run = writer->run;
140,203✔
2177
        if (run->info.page_count >= writer->page_info_capacity &&
140,203✔
2178
            vy_run_alloc_page_info(run, &writer->page_info_capacity) != 0)
6,137!
2179
                return -1;
×
2180
        const char *key = vy_stmt_is_key(first_entry.stmt) ?
140,203✔
2181
                          tuple_data(first_entry.stmt) :
140,203✔
2182
                          tuple_extract_key(first_entry.stmt, writer->cmp_def,
74,006✔
2183
                                            vy_entry_multikey_idx(first_entry,
2184
                                                                  writer->cmp_def),
2185
                                            NULL);
2186
        if (key == NULL)
140,203!
2187
                return -1;
×
2188
        if (run->info.page_count == 0) {
140,203✔
2189
                assert(run->info.min_key == NULL);
3,181!
2190
                run->info.min_key = vy_key_dup(key);
3,181✔
2191
                if (run->info.min_key == NULL)
3,181!
2192
                        return -1;
×
2193
        }
2194
        struct vy_page_info *page = run->page_info + run->info.page_count;
140,203✔
2195
        if (vy_page_info_create(page, writer->data_xlog.offset,
140,203!
2196
                                key, writer->cmp_def) != 0)
2197
                return -1;
×
2198
        xlog_tx_begin(&writer->data_xlog);
140,203✔
2199
        return 0;
140,203✔
2200
}
2201

2202
/**
2203
 * Write @a stmt into a current page.
2204
 * @param writer Run writer.
2205
 * @param entry Statement to write.
2206
 *
2207
 * @retval -1 Memory or IO error.
2208
 * @retval  0 Success.
2209
 */
2210
static int
2211
vy_run_writer_write_to_page(struct vy_run_writer *writer, struct vy_entry entry)
5,523,600✔
2212
{
2213
        if (writer->bloom != NULL &&
5,523,600✔
2214
            vy_bloom_builder_add(writer->bloom, entry, writer->key_def) != 0)
5,523,600!
2215
                return -1;
×
2216
        if (writer->last.stmt != NULL)
5,523,600✔
2217
                vy_stmt_unref_if_possible(writer->last.stmt);
5,520,420✔
2218
        writer->last = entry;
5,523,600✔
2219
        vy_stmt_ref_if_possible(entry.stmt);
5,523,600✔
2220
        struct vy_run *run = writer->run;
5,523,600✔
2221
        struct vy_page_info *page = run->page_info + run->info.page_count;
5,523,600✔
2222
        uint32_t *offset = (uint32_t *)ibuf_alloc(&writer->row_index_buf,
5,523,600✔
2223
                                                  sizeof(uint32_t));
2224
        if (offset == NULL) {
5,523,600!
2225
                diag_set(OutOfMemory, sizeof(uint32_t), "ibuf", "row index");
×
2226
                return -1;
×
2227
        }
2228
        *offset = page->unpacked_size;
5,523,600✔
2229
        if (vy_run_dump_stmt(entry, &writer->data_xlog, page,
5,523,600!
2230
                             writer->cmp_def, writer->iid == 0) != 0)
5,523,600✔
2231
                return -1;
×
2232
        int64_t lsn = vy_stmt_lsn(entry.stmt);
5,523,600✔
2233
        run->info.min_lsn = MIN(run->info.min_lsn, lsn);
5,523,600✔
2234
        run->info.max_lsn = MAX(run->info.max_lsn, lsn);
5,523,600✔
2235
        vy_stmt_stat_acct(&run->info.stmt_stat, vy_stmt_type(entry.stmt));
5,523,600✔
2236
        return 0;
5,523,600✔
2237
}
2238

2239
/**
2240
 * Finish a current page.
2241
 * @param writer Run writer.
2242
 * @retval -1 Memory or IO error.
2243
 * @retval  0 Success.
2244
 */
2245
static int
2246
vy_run_writer_end_page(struct vy_run_writer *writer)
140,200✔
2247
{
2248
        struct vy_run *run = writer->run;
140,200✔
2249
        struct vy_page_info *page = run->page_info + run->info.page_count;
140,200✔
2250

2251
        assert(page->row_count > 0);
140,200!
2252
        assert(ibuf_used(&writer->row_index_buf) ==
140,200!
2253
               sizeof(uint32_t) * page->row_count);
2254

2255
        struct xrow_header xrow;
2256
        uint32_t *row_index = (uint32_t *)writer->row_index_buf.rpos;
140,200✔
2257
        if (vy_row_index_encode(row_index, page->row_count, &xrow) < 0)
140,200!
2258
                return -1;
140,197✔
2259
        ssize_t written = xlog_write_row(&writer->data_xlog, &xrow);
140,200!
2260
        if (written < 0)
140,200!
2261
                return -1;
×
2262
        page->row_index_offset = page->unpacked_size;
140,200✔
2263
        page->unpacked_size += written;
140,200✔
2264

2265
        written = xlog_tx_commit(&writer->data_xlog);
140,200!
2266
        if (written == 0)
140,200✔
2267
                written = xlog_flush(&writer->data_xlog);
140,182✔
2268
        if (written < 0)
140,197!
2269
                return -1;
×
2270
        page->size = written;
140,197✔
2271
        run->info.page_count++;
140,197✔
2272
        vy_run_acct_page(run, page);
140,197!
2273
        ibuf_reset(&writer->row_index_buf);
140,197!
2274
        return 0;
140,197✔
2275
}
2276

2277
int
2278
vy_run_writer_append_stmt(struct vy_run_writer *writer, struct vy_entry entry)
5,523,600✔
2279
{
2280
        int rc = -1;
5,523,600✔
2281
        size_t region_svp = region_used(&fiber()->gc);
5,523,600!
2282
        if (!xlog_is_open(&writer->data_xlog) &&
5,523,600✔
2283
            vy_run_writer_create_xlog(writer) != 0)
3,181!
2284
                goto out;
×
2285
        if (ibuf_used(&writer->row_index_buf) == 0 &&
5,523,600✔
2286
            vy_run_writer_start_page(writer, entry) != 0)
140,203!
2287
                goto out;
×
2288
        if (vy_run_writer_write_to_page(writer, entry) != 0)
5,523,600!
2289
                goto out;
×
2290
        if (obuf_size(&writer->data_xlog.obuf) >= writer->page_size &&
5,523,600✔
2291
            vy_run_writer_end_page(writer) != 0)
137,301!
2292
                goto out;
×
2293
        rc = 0;
5,523,600✔
2294
out:
5,523,600✔
2295
        region_truncate(&fiber()->gc, region_svp);
5,523,600!
2296
        return rc;
5,523,600✔
2297
}
2298

2299
/**
2300
 * Destroy a run writer.
2301
 * @param writer Writer to destroy.
2302
 * @param reuse_fd True in a case of success run write. And else
2303
 *        false.
2304
 */
2305
static void
2306
vy_run_writer_destroy(struct vy_run_writer *writer, bool reuse_fd)
3,262✔
2307
{
2308
        if (writer->last.stmt != NULL)
3,262✔
2309
                vy_stmt_unref_if_possible(writer->last.stmt);
3,173✔
2310
        if (xlog_is_open(&writer->data_xlog))
3,262✔
2311
                xlog_close(&writer->data_xlog, reuse_fd);
3,173✔
2312
        if (writer->bloom != NULL)
3,260✔
2313
                tuple_bloom_builder_delete(writer->bloom);
3,258✔
2314
        ibuf_destroy(&writer->row_index_buf);
3,260✔
2315
}
3,260✔
2316

2317
int
2318
vy_run_writer_commit(struct vy_run_writer *writer)
3,261✔
2319
{
2320
        int rc = -1;
3,261✔
2321
        size_t region_svp = region_used(&fiber()->gc);
3,261!
2322

2323
        if (ibuf_used(&writer->row_index_buf) != 0 &&
3,261✔
2324
            vy_run_writer_end_page(writer) != 0)
2,899!
2325
                goto out;
×
2326

2327
        struct vy_run *run = writer->run;
3,261✔
2328
        if (vy_run_is_empty(run)) {
3,261✔
2329
                vy_run_writer_destroy(writer, false);
87✔
2330
                rc = 0;
87✔
2331
                goto out;
87✔
2332
        }
2333

2334
        assert(writer->last.stmt != NULL);
3,174!
2335
        const char *key = vy_stmt_is_key(writer->last.stmt) ?
3,174✔
2336
                          tuple_data(writer->last.stmt) :
3,174✔
2337
                          tuple_extract_key(writer->last.stmt, writer->cmp_def,
2,441✔
2338
                                            vy_entry_multikey_idx(writer->last,
2339
                                                                  writer->cmp_def),
2340
                                            NULL);
2341
        if (key == NULL)
3,174!
2342
                goto out;
×
2343

2344
        assert(run->info.max_key == NULL);
3,174!
2345
        run->info.max_key = vy_key_dup(key);
3,174✔
2346
        if (run->info.max_key == NULL)
3,174!
2347
                goto out;
×
2348

2349
        ERROR_INJECT(ERRINJ_VY_RUN_FILE_RENAME, {
3,174!
2350
                diag_set(ClientError, ER_INJECTION, "vinyl run file rename");
2351
                goto out;
2352
        });
2353

2354
        /* Sync data and link the file to the final name. */
2355
        if (xlog_sync(&writer->data_xlog) < 0 ||
3,173!
2356
            xlog_rename(&writer->data_xlog) < 0)
3,173!
2357
                goto out;
×
2358

2359
        if (writer->bloom != NULL) {
3,173✔
2360
                run->info.bloom = tuple_bloom_new(writer->bloom,
3,171✔
2361
                                                  writer->bloom_fpr);
2362
                if (run->info.bloom == NULL)
3,171!
2363
                        goto out;
×
2364
        }
2365
        if (vy_run_write_index(run, writer->dirpath,
3,173✔
2366
                               writer->space_id, writer->iid) != 0)
2367
                goto out;
1✔
2368

2369
        run->fd = writer->data_xlog.fd;
3,170✔
2370
        vy_run_writer_destroy(writer, true);
3,170✔
2371
        rc = 0;
3,168✔
2372
out:
3,257✔
2373
        region_truncate(&fiber()->gc, region_svp);
3,257!
2374
        return rc;
3,257✔
2375
}
2376

2377
void
2378
vy_run_writer_abort(struct vy_run_writer *writer)
5✔
2379
{
2380
        vy_run_writer_destroy(writer, false);
5✔
2381
}
5✔
2382

2383
int
2384
vy_run_rebuild_index(struct vy_run *run, const char *dir,
5✔
2385
                     uint32_t space_id, uint32_t iid,
2386
                     struct key_def *cmp_def, struct key_def *key_def,
2387
                     struct tuple_format *format, const struct index_opts *opts)
2388
{
2389
        assert(run->info.bloom == NULL);
5!
2390
        assert(run->page_info == NULL);
5!
2391
        struct region *region = &fiber()->gc;
5!
2392
        size_t mem_used = region_used(region);
5!
2393

2394
        struct xlog_cursor cursor;
2395
        char path[PATH_MAX];
2396
        vy_run_snprint_path(path, sizeof(path), dir,
5!
2397
                            space_id, iid, run->id, VY_FILE_RUN);
2398

2399
        say_info("rebuilding index for `%s'", path);
5!
2400
        if (xlog_cursor_open(&cursor, path))
5!
2401
                return -1;
5✔
2402

2403
        int rc = 0;
5✔
2404
        uint32_t page_info_capacity = 0;
5✔
2405

2406
        const char *key = NULL;
5✔
2407
        int64_t max_lsn = 0;
5✔
2408
        int64_t min_lsn = INT64_MAX;
5✔
2409
        struct tuple *prev_tuple = NULL;
5✔
2410
        char *page_min_key = NULL;
5✔
2411

2412
        struct tuple_bloom_builder *bloom_builder = NULL;
5✔
2413
        if (opts->bloom_fpr < 1) {
5!
2414
                bloom_builder = tuple_bloom_builder_new(key_def->part_count);
5!
2415
                if (bloom_builder == NULL)
5!
2416
                        goto close_err;
×
2417
        }
2418

2419
        off_t page_offset, next_page_offset = xlog_cursor_pos(&cursor);
5!
2420
        while ((rc = xlog_cursor_next_tx(&cursor)) == 0) {
674!
2421
                region_truncate(region, mem_used);
669!
2422
                page_offset = next_page_offset;
669✔
2423
                next_page_offset = xlog_cursor_pos(&cursor);
669!
2424

2425
                if (run->info.page_count == page_info_capacity &&
669✔
2426
                    vy_run_alloc_page_info(run, &page_info_capacity) != 0)
13!
2427
                        goto close_err;
×
2428
                uint32_t page_row_count = 0;
669✔
2429
                uint64_t page_row_index_offset = 0;
669✔
2430
                uint64_t row_offset = xlog_cursor_tx_pos(&cursor);
669!
2431

2432
                struct xrow_header xrow;
2433
                while ((rc = xlog_cursor_next_row(&cursor, &xrow)) == 0) {
15,346!
2434
                        if (xrow.type == VY_RUN_ROW_INDEX) {
14,677✔
2435
                                page_row_index_offset = row_offset;
669✔
2436
                                row_offset = xlog_cursor_tx_pos(&cursor);
669!
2437
                                continue;
669✔
2438
                        }
2439
                        ++page_row_count;
14,008✔
2440
                        struct tuple *tuple = vy_stmt_decode(&xrow, format);
14,008!
2441
                        if (tuple == NULL)
14,008!
2442
                                goto close_err;
×
2443
                        if (bloom_builder != NULL) {
14,008!
2444
                                struct vy_entry entry = {tuple, HINT_NONE};
14,008✔
2445
                                if (vy_bloom_builder_add(bloom_builder, entry,
14,008!
2446
                                                         key_def) != 0) {
2447
                                        tuple_unref(tuple);
×
2448
                                        goto close_err;
×
2449
                                }
2450
                        }
2451
                        key = vy_stmt_is_key(tuple) ? tuple_data(tuple) :
14,008!
2452
                              tuple_extract_key(tuple, cmp_def,
12,004!
2453
                                                MULTIKEY_NONE, NULL);
2454
                        if (prev_tuple != NULL)
14,008✔
2455
                                tuple_unref(prev_tuple);
14,003!
2456
                        prev_tuple = tuple;
14,008✔
2457
                        if (key == NULL)
14,008!
2458
                                goto close_err;
×
2459
                        if (run->info.min_key == NULL) {
14,008✔
2460
                                run->info.min_key = vy_key_dup(key);
5!
2461
                                if (run->info.min_key == NULL)
5!
2462
                                        goto close_err;
×
2463
                        }
2464
                        if (page_min_key == NULL) {
14,008✔
2465
                                page_min_key = vy_key_dup(key);
669!
2466
                                if (page_min_key == NULL)
669!
2467
                                        goto close_err;
×
2468
                        }
2469
                        if (xrow.lsn > max_lsn)
14,008✔
2470
                                max_lsn = xrow.lsn;
14,006✔
2471
                        if (xrow.lsn < min_lsn)
14,008✔
2472
                                min_lsn = xrow.lsn;
6✔
2473
                        row_offset = xlog_cursor_tx_pos(&cursor);
14,008!
2474
                }
2475
                struct vy_page_info *info;
2476
                info = run->page_info + run->info.page_count;
669✔
2477
                if (vy_page_info_create(info, page_offset,
669!
2478
                                        page_min_key, cmp_def) != 0)
2479
                        goto close_err;
×
2480
                info->row_count = page_row_count;
669✔
2481
                info->size = next_page_offset - page_offset;
669✔
2482
                info->unpacked_size = xlog_cursor_tx_pos(&cursor);
669!
2483
                info->row_index_offset = page_row_index_offset;
669✔
2484
                ++run->info.page_count;
669✔
2485
                vy_run_acct_page(run, info);
669!
2486

2487
                free(page_min_key);
669✔
2488
                page_min_key = NULL;
669✔
2489
        }
2490

2491
        if (key != NULL) {
5!
2492
                run->info.max_key = vy_key_dup(key);
5!
2493
                if (run->info.max_key == NULL)
5!
2494
                        goto close_err;
×
2495
        }
2496
        run->info.max_lsn = max_lsn;
5✔
2497
        run->info.min_lsn = min_lsn;
5✔
2498

2499
        if (prev_tuple != NULL) {
5!
2500
                tuple_unref(prev_tuple);
5!
2501
                prev_tuple = NULL;
5✔
2502
        }
2503
        region_truncate(region, mem_used);
5!
2504
        run->fd = cursor.fd;
5✔
2505
        xlog_cursor_close(&cursor, true);
5!
2506

2507
        if (bloom_builder != NULL) {
5!
2508
                run->info.bloom = tuple_bloom_new(bloom_builder,
5!
2509
                                                  opts->bloom_fpr);
2510
                if (run->info.bloom == NULL)
5!
2511
                        goto close_err;
×
2512
                tuple_bloom_builder_delete(bloom_builder);
5!
2513
                bloom_builder = NULL;
5✔
2514
        }
2515

2516
        /* New run index is ready for write, unlink old file if exists */
2517
        vy_run_snprint_path(path, sizeof(path), dir,
5!
2518
                            space_id, iid, run->id, VY_FILE_INDEX);
2519
        if (unlink(path) < 0 && errno != ENOENT) {
5!
2520
                diag_set(SystemError, "failed to unlink file '%s'",
×
2521
                         path);
2522
                goto close_err;
×
2523
        }
2524
        if (vy_run_write_index(run, dir, space_id, iid) != 0)
5!
2525
                goto close_err;
×
2526
        return 0;
5✔
2527
close_err:
×
2528
        vy_run_clear(run);
×
2529
        region_truncate(region, mem_used);
×
2530
        if (prev_tuple != NULL)
×
2531
                tuple_unref(prev_tuple);
×
2532
        if (page_min_key != NULL)
×
2533
                free(page_min_key);
×
2534
        if (bloom_builder != NULL)
×
2535
                tuple_bloom_builder_delete(bloom_builder);
×
2536
        if (xlog_cursor_is_open(&cursor))
×
2537
                xlog_cursor_close(&cursor, false);
×
2538
        return -1;
×
2539
}
2540

2541
/**
2542
 * Try to remove an empty directory.
2543
 *
2544
 * @param path Path to the directory.
2545
 * @return int 0 on success, -1 on error.
2546
 */
2547
static int
2548
try_rmdir(const char *path)
2,200✔
2549
{
2550
        int rc = 0;
2,200✔
2551
        if (coio_rmdir(path) < 0) {
2,200✔
2552
                if (errno != ENOENT) {
2,132✔
2553
                        if (errno != ENOTEMPTY)
2,120!
2554
                                say_syserror("error while removing %s", path);
×
2555
                        rc = -1;
2,120✔
2556
                }
2557
        } else {
2558
                say_info("removed %s", path);
68!
2559
        }
2560
        return rc;
2,200✔
2561
}
2562

2563
int
2564
vy_run_remove_files(const char *dir, uint32_t space_id,
2,145✔
2565
                    uint32_t iid, int64_t run_id)
2566
{
2567
        ERROR_INJECT(ERRINJ_VY_GC,
4,287!
2568
                     {say_error("error injection: vinyl run %lld not deleted",
2569
                                (long long)run_id); return -1;});
2570
        int ret = 0;
2,142✔
2571
        char path[PATH_MAX];
2572
        for (int type = 0; type < vy_file_MAX; type++) {
10,710✔
2573
                vy_run_snprint_path(path, sizeof(path), dir,
8,568!
2574
                                    space_id, iid, run_id, type);
2575
                if (coio_unlink(path) < 0) {
8,568!
2576
                        if (errno != ENOENT) {
5,111!
2577
                                say_syserror("error while removing %s", path);
×
2578
                                ret = -1;
×
2579
                        }
2580
                } else
2581
                        say_info("removed %s", path);
3,457!
2582
        }
2583
        /* Remove the root directory if it's empty. */
2584
        vy_lsm_snprint_path(path, sizeof(path), dir, space_id, iid);
2,142!
2585
        if (try_rmdir(path) < 0)
2,142!
2586
                return ret;
2,084✔
2587
        vy_space_snprint_path(path, sizeof(path), dir, space_id);
58!
2588
        try_rmdir(path);
58!
2589
        return ret;
58✔
2590
}
2591

2592
/**
2593
 * Read a page with stream->page_no from the run and save it in stream->page.
2594
 * Support function of slice stream.
2595
 * @param stream - the stream.
2596
 * @return 0 on success, -1 of memory or read error (diag is set).
2597
 */
2598
static NODISCARD int
2599
vy_slice_stream_read_page(struct vy_slice_stream *stream)
111,778✔
2600
{
2601
        struct vy_run *run = stream->slice->run;
111,778✔
2602

2603
        assert(stream->page == NULL);
111,778!
2604
        ZSTD_DStream *zdctx = vy_env_get_zdctx(run->env);
111,778✔
2605
        if (zdctx == NULL)
111,778!
2606
                return -1;
×
2607

2608
        struct vy_page_info *page_info = vy_run_page_info(run, stream->page_no);
111,778✔
2609
        stream->page = vy_page_new(page_info);
111,778✔
2610
        if (stream->page == NULL)
111,778!
2611
                return -1;
×
2612

2613
        if (vy_page_read(stream->page, page_info, run, zdctx) != 0) {
111,778!
2614
                vy_page_delete(stream->page);
×
2615
                stream->page = NULL;
×
2616
                return -1;
×
2617
        }
2618
        return 0;
111,777✔
2619
}
2620

2621
/**
2622
 * Binary search in a run for the given key. Find the first position with
2623
 * a tuple greater or equal to slice
2624
 * @retval 0 success
2625
 * @retval -1 read or memory error
2626
 */
2627
static NODISCARD int
2628
vy_slice_stream_search(struct vy_stmt_stream *virt_stream)
3,799✔
2629
{
2630
        assert(virt_stream->iface->start == vy_slice_stream_search);
3,799!
2631
        struct vy_slice_stream *stream = (struct vy_slice_stream *)virt_stream;
3,799✔
2632
        assert(stream->page == NULL);
3,799!
2633
        if (stream->slice->begin.stmt == NULL) {
3,799✔
2634
                /* Already at the beginning */
2635
                assert(stream->page_no == 0);
1,718!
2636
                assert(stream->pos_in_page == 0);
1,718!
2637
                return 0;
3,799✔
2638
        }
2639

2640
        if (vy_slice_stream_read_page(stream) != 0)
2,081!
2641
                return -1;
×
2642

2643
        bool unused;
2644
        stream->pos_in_page = vy_page_find_key(stream->page,
4,162✔
2645
                                               stream->slice->begin,
2,081!
2646
                                               stream->cmp_def,
2647
                                               stream->format,
2648
                                               ITER_GE, &unused);
2649

2650
        if (stream->pos_in_page == stream->page->row_count) {
2,081✔
2651
                /* The first tuple is in the beginning of the next page */
2652
                vy_page_delete(stream->page);
347!
2653
                stream->page = NULL;
347✔
2654
                stream->page_no++;
347✔
2655
                stream->pos_in_page = 0;
347✔
2656
        }
2657
        return 0;
2,081✔
2658
}
2659

2660
/**
2661
 * Get the value from the stream and move to the next position.
2662
 * Set *ret to the value or NULL if EOF.
2663
 * @param virt_stream - virtual stream.
2664
 * @param ret - pointer to the result.
2665
 * @return 0 on success, -1 on memory or read error.
2666
 */
2667
static NODISCARD int
2668
vy_slice_stream_next(struct vy_stmt_stream *virt_stream, struct vy_entry *ret)
4,252,680✔
2669
{
2670
        assert(virt_stream->iface->next == vy_slice_stream_next);
4,252,680!
2671
        struct vy_slice_stream *stream = (struct vy_slice_stream *)virt_stream;
4,252,680✔
2672
        *ret = vy_entry_none();
4,252,680!
2673

2674
        /* If the slice is ended, return EOF */
2675
        if (stream->page_no > stream->slice->last_page_no)
4,252,680✔
2676
                return 0;
4,252,680✔
2677

2678
        /* If current page is not already read, read it */
2679
        if (stream->page == NULL && vy_slice_stream_read_page(stream) != 0)
4,250,480!
2680
                return -1;
×
2681

2682
        /* Read current tuple from the page */
2683
        struct vy_entry entry = vy_page_stmt(stream->page, stream->pos_in_page,
4,250,470!
2684
                                             stream->cmp_def, stream->format);
2685
        if (entry.stmt == NULL) /* Read or memory error */
4,250,470✔
2686
                return -1;
1✔
2687

2688
        /* Check that the tuple is not out of slice bounds = */
2689
        if (stream->slice->end.stmt != NULL &&
4,250,470✔
2690
            stream->page_no >= stream->slice->last_page_no &&
2,350,360✔
2691
            vy_entry_compare(entry, stream->slice->end, stream->cmp_def) >= 0) {
41,877!
2692
                tuple_unref(entry.stmt);
1,559!
2693
                return 0;
1,559✔
2694
        }
2695

2696
        /* We definitely has the next non-null tuple. Save it in stream */
2697
        if (stream->entry.stmt != NULL)
4,248,910✔
2698
                tuple_unref(stream->entry.stmt);
4,245,220!
2699
        stream->entry = entry;
4,248,910✔
2700
        *ret = entry;
4,248,910✔
2701

2702
        /* Increment position */
2703
        stream->pos_in_page++;
4,248,910✔
2704

2705
        /* Check whether the position is out of page */
2706
        struct vy_page_info *page_info = vy_run_page_info(stream->slice->run,
4,248,910!
2707
                                                          stream->page_no);
2708
        if (stream->pos_in_page >= page_info->row_count) {
4,248,910✔
2709
                /**
2710
                 * Out of page. Free page, move the position to the next page
2711
                 * and * nullify page pointer to read it on the next iteration.
2712
                 */
2713
                vy_page_delete(stream->page);
109,844!
2714
                stream->page = NULL;
109,844✔
2715
                stream->page_no++;
109,844✔
2716
                stream->pos_in_page = 0;
109,844✔
2717
        }
2718

2719
        return 0;
4,248,910✔
2720
}
2721

2722
/**
2723
 * Free resources.
2724
 */
2725
static void
2726
vy_slice_stream_stop(struct vy_stmt_stream *virt_stream)
3,772✔
2727
{
2728
        assert(virt_stream->iface->stop == vy_slice_stream_stop);
3,772!
2729
        struct vy_slice_stream *stream = (struct vy_slice_stream *)virt_stream;
3,772✔
2730
        if (stream->page != NULL) {
3,772✔
2731
                vy_page_delete(stream->page);
1,560✔
2732
                stream->page = NULL;
1,560✔
2733
        }
2734
        if (stream->entry.stmt != NULL) {
3,772✔
2735
                tuple_unref(stream->entry.stmt);
3,672✔
2736
                stream->entry = vy_entry_none();
3,672✔
2737
        }
2738
}
3,772✔
2739

2740
static void
2741
vy_slice_stream_close(struct vy_stmt_stream *virt_stream)
3,753✔
2742
{
2743
        assert(virt_stream->iface->close == vy_slice_stream_close);
3,753!
2744
        struct vy_slice_stream *stream = (struct vy_slice_stream *)virt_stream;
3,753✔
2745
        tuple_format_unref(stream->format);
3,753✔
2746
}
3,753✔
2747

2748
static const struct vy_stmt_stream_iface vy_slice_stream_iface = {
2749
        .start = vy_slice_stream_search,
2750
        .next = vy_slice_stream_next,
2751
        .stop = vy_slice_stream_stop,
2752
        .close = vy_slice_stream_close,
2753
};
2754

2755
void
2756
vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
3,800✔
2757
                     struct key_def *cmp_def, struct tuple_format *format)
2758
{
2759
        stream->base.iface = &vy_slice_stream_iface;
3,800✔
2760

2761
        stream->page_no = slice->first_page_no;
3,800✔
2762
        stream->pos_in_page = 0; /* We'll find it later */
3,800✔
2763
        stream->page = NULL;
3,800✔
2764
        stream->entry = vy_entry_none();
3,800✔
2765

2766
        stream->slice = slice;
3,800✔
2767
        stream->cmp_def = cmp_def;
3,800✔
2768
        stream->format = format;
3,800✔
2769
        tuple_format_ref(format);
3,800✔
2770
}
3,800✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc