• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 9253825433

27 May 2024 11:10AM UTC coverage: 85.789% (-0.006%) from 85.795%
9253825433

Pull #10020

github

web-flow
Merge 051c335d9 into 69f2ddfcc
Pull Request #10020: [backport 2.11] cmake: bump OpenSSL version 1.1.1 -> 3.2.1

62596 of 113824 branches covered (54.99%)

93468 of 108951 relevant lines covered (85.79%)

2595923.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.69
/src/box/vy_read_iterator.c
1
/*
2
 * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "vy_read_iterator.h"
32
#include "vy_run.h"
33
#include "vy_mem.h"
34
#include "vy_cache.h"
35
#include "vy_tx.h"
36
#include "fiber.h"
37
#include "vy_history.h"
38
#include "vy_lsm.h"
39
#include "vy_stat.h"
40

41
/**
42
 * Merge source, support structure for vy_read_iterator.
43
 * Contains source iterator and merge state.
44
 */
45
struct vy_read_src {
46
        /** Source iterator. */
47
        union {
48
                struct vy_run_iterator run_iterator;
49
                struct vy_mem_iterator mem_iterator;
50
                struct vy_txw_iterator txw_iterator;
51
                struct vy_cache_iterator cache_iterator;
52
        };
53
        /** Set if the iterator was started. */
54
        bool is_started;
55
        /** See vy_read_iterator->front_id. */
56
        uint32_t front_id;
57
        /** History of the key the iterator is positioned at. */
58
        struct vy_history history;
59
};
60

61
/**
62
 * Extend internal source array capacity to fit capacity sources.
63
 * Not necessary to call is but calling it allows to optimize internal memory
64
 * allocation
65
 */
66
static NODISCARD int
67
vy_read_iterator_reserve(struct vy_read_iterator *itr, uint32_t capacity)
1,580,620✔
68
{
69
        if (itr->src_capacity >= capacity)
1,580,620!
70
                return 0;
×
71
        struct vy_read_src *new_src = calloc(capacity, sizeof(*new_src));
1,580,620✔
72
        if (new_src == NULL) {
1,580,620!
73
                diag_set(OutOfMemory, capacity * sizeof(*new_src),
×
74
                         "calloc", "new_src");
75
                return -1;
×
76
        }
77
        memcpy(new_src, itr->src, itr->src_count * sizeof(*new_src));
1,580,620✔
78
        for (uint32_t i = 0; i < itr->src_count; i++) {
4,165,720✔
79
                vy_history_create(&new_src[i].history,
2,585,100✔
80
                                  &itr->lsm->env->history_node_pool);
2,585,100✔
81
                vy_history_splice(&new_src[i].history, &itr->src[i].history);
2,585,100✔
82
        }
83
        free(itr->src);
1,580,620✔
84
        itr->src = new_src;
1,580,620✔
85
        itr->src_capacity = capacity;
1,580,620✔
86
        return 0;
1,580,620✔
87
}
88

89
/**
90
 * Add another source to read iterator. Must be called before actual
91
 * iteration start and must not be called after.
92
 */
93
static struct vy_read_src *
94
vy_read_iterator_add_src(struct vy_read_iterator *itr)
1,582,900✔
95
{
96
        if (itr->src_count == itr->src_capacity) {
1,582,900✔
97
                if (vy_read_iterator_reserve(itr, itr->src_count + 1) != 0)
1,580,620!
98
                        return NULL;
×
99
        }
100
        struct vy_read_src *src = &itr->src[itr->src_count++];
1,582,900✔
101
        memset(src, 0, sizeof(*src));
1,582,900✔
102
        vy_history_create(&src->history, &itr->lsm->env->history_node_pool);
1,582,900✔
103
        return src;
1,582,900✔
104
}
105

106
/**
107
 * Pin all slices open by the read iterator.
108
 * Used to make sure no run slice is invalidated by
109
 * compaction while we are fetching data from disk.
110
 */
111
static void
112
vy_read_iterator_pin_slices(struct vy_read_iterator *itr)
1,693,850✔
113
{
114
        for (uint32_t i = itr->disk_src; i < itr->src_count; i++) {
2,330,390✔
115
                struct vy_read_src *src = &itr->src[i];
636,540✔
116
                vy_slice_pin(src->run_iterator.slice);
636,540✔
117
        }
118
}
1,693,850✔
119

120
/**
121
 * Unpin all slices open by the read iterator.
122
 * See also: vy_read_iterator_pin_slices().
123
 */
124
static void
125
vy_read_iterator_unpin_slices(struct vy_read_iterator *itr)
1,693,850✔
126
{
127
        for (uint32_t i = itr->disk_src; i < itr->src_count; i++) {
2,330,390✔
128
                struct vy_read_src *src = &itr->src[i];
636,539✔
129
                vy_slice_unpin(src->run_iterator.slice);
636,539✔
130
        }
131
}
1,693,850✔
132

133
/**
134
 * Return true if the current candidate for the next key is outside
135
 * the current range and hence we should move to the next range.
136
 *
137
 * If we are looking for a match (EQ, REQ) and the search key
138
 * doesn't intersect with the current range's boundary, the next
139
 * range can't contain statements matching the search criteria
140
 * and hence there's no point in iterating to it.
141
 */
142
static bool
143
vy_read_iterator_range_is_done(struct vy_read_iterator *itr,
1,693,750✔
144
                               struct vy_entry next)
145
{
146
        struct vy_range *range = itr->curr_range;
1,693,750✔
147
        struct key_def *cmp_def = itr->lsm->cmp_def;
1,693,750✔
148
        int dir = iterator_direction(itr->iterator_type);
1,693,750✔
149

150
        if (dir > 0 && range->end.stmt != NULL &&
1,693,750✔
151
            (next.stmt == NULL || vy_entry_compare(next, range->end,
74,265✔
152
                                                   cmp_def) >= 0) &&
13,295✔
153
            (itr->iterator_type != ITER_EQ ||
13,295✔
154
             vy_entry_compare(itr->key, range->end, cmp_def) >= 0))
13,221✔
155
                return true;
3,250✔
156

157
        if (dir < 0 && range->begin.stmt != NULL &&
1,690,500✔
158
            (next.stmt == NULL || vy_entry_compare(next, range->begin,
9,453!
159
                                                   cmp_def) < 0) &&
×
160
            (itr->iterator_type != ITER_REQ ||
×
161
             vy_entry_compare(itr->key, range->begin, cmp_def) <= 0))
×
162
                return true;
×
163

164
        return false;
1,690,500✔
165
}
166

167
/**
168
 * Compare two tuples from the read iterator perspective.
169
 *
170
 * Returns:
171
 *  < 0 if statement @a precedes statement @b in the iterator output
172
 * == 0 if statements @a and @b are at the same position
173
 *  > 0 if statement @a supersedes statement @b
174
 *
175
 * NULL denotes the statement following the last one.
176
 */
177
static inline int
178
vy_read_iterator_cmp_stmt(struct vy_read_iterator *itr,
12,393,500✔
179
                          struct vy_entry a, struct vy_entry b)
180
{
181
        if (a.stmt == NULL && b.stmt != NULL)
12,393,500✔
182
                return 1;
1,711,440✔
183
        if (a.stmt != NULL && b.stmt == NULL)
10,682,100✔
184
                return -1;
2,835,300✔
185
        if (a.stmt == NULL && b.stmt == NULL)
7,846,810!
186
                return 0;
4,660,620✔
187
        return iterator_direction(itr->iterator_type) *
3,186,190✔
188
                vy_entry_compare(a, b, itr->lsm->cmp_def);
3,186,190✔
189
}
190

191
/**
192
 * Return true if the statement matches search criteria
193
 * and older sources don't need to be scanned.
194
 */
195
static bool
196
vy_read_iterator_is_exact_match(struct vy_read_iterator *itr,
3,130,600✔
197
                                struct vy_entry entry)
198
{
199
        enum iterator_type type = itr->iterator_type;
3,130,600✔
200
        struct key_def *cmp_def = itr->lsm->cmp_def;
3,130,600✔
201

202
        /*
203
         * If the index is unique and the search key is full,
204
         * we can avoid disk accesses on the first iteration
205
         * in case the key is found in memory.
206
         */
207
        return itr->last.stmt == NULL && entry.stmt != NULL &&
241,917!
208
                (type == ITER_EQ || type == ITER_REQ ||
82,205✔
209
                 type == ITER_GE || type == ITER_LE) &&
6,819✔
210
                vy_stmt_is_full_key(itr->key.stmt, cmp_def) &&
3,411,510✔
211
                vy_entry_compare(entry, itr->key, cmp_def) == 0;
38,995✔
212
}
213

214
/**
215
 * Check if the statement at which the given read source
216
 * is positioned precedes the current candidate for the
217
 * next key ('next') and update the latter if so.
218
 * The 'stop' flag is set if the next key is found and
219
 * older sources don't need to be evaluated.
220
 */
221
static void
222
vy_read_iterator_evaluate_src(struct vy_read_iterator *itr,
9,792,290✔
223
                              struct vy_read_src *src,
224
                              struct vy_entry *next, bool *stop)
225
{
226
        uint32_t src_id = src - itr->src;
9,792,290✔
227
        struct vy_entry entry = vy_history_last_stmt(&src->history);
9,792,290!
228
        int cmp = vy_read_iterator_cmp_stmt(itr, entry, *next);
9,792,290!
229
        if (cmp < 0) {
9,792,290✔
230
                assert(entry.stmt != NULL);
3,160,000!
231
                *next = entry;
3,160,000✔
232
                itr->front_id++;
3,160,000✔
233
        }
234
        if (cmp <= 0)
9,792,290✔
235
                src->front_id = itr->front_id;
7,863,630✔
236

237
        itr->skipped_src = MAX(itr->skipped_src, src_id + 1);
9,792,290✔
238

239
        if (cmp < 0 && vy_history_is_terminal(&src->history) &&
9,792,290!
240
            vy_read_iterator_is_exact_match(itr, entry)) {
3,130,600!
241
                itr->skipped_src = src_id + 1;
36,231✔
242
                *stop = true;
36,231✔
243
        }
244
}
9,792,290✔
245

246
/**
247
 * Reevaluate scanned (not skipped) read sources and position 'next' to
248
 * the statement that is minimal from this read iterator's perspective.
249
 * This function assumes that all scanned read sources are up-to-date.
250
 * See also vy_read_iterator_evaluate_src().
251
 */
252
static void
253
vy_read_iterator_reevaluate_srcs(struct vy_read_iterator *itr,
1✔
254
                                 struct vy_entry *next)
255
{
256
        *next = vy_entry_none();
1✔
257
        for (uint32_t i = 0; i < itr->src_count; i++) {
5✔
258
                if (i >= itr->skipped_src)
4!
259
                        break;
×
260
                struct vy_read_src *src = &itr->src[i];
4✔
261
                struct vy_entry entry = vy_history_last_stmt(&src->history);
4!
262
                int cmp = vy_read_iterator_cmp_stmt(itr, entry, *next);
4!
263
                if (cmp < 0) {
4✔
264
                        *next = entry;
1✔
265
                        itr->front_id++;
1✔
266
                }
267
                if (cmp <= 0)
4!
268
                        src->front_id = itr->front_id;
4✔
269
        }
270
}
1✔
271

272
/*
273
 * Each of the functions from the vy_read_iterator_scan_* family
274
 * is used by vy_read_iterator_advance() to:
275
 *
276
 * 1. Update the position of a read source, which implies:
277
 *
278
 *    - Starting iteration over the source if it has not been done
279
 *      yet or restoring the iterator position in case the source
280
 *      has been modified since the last iteration.
281
 *
282
 *    - Advancing the iterator position to the first statement
283
 *      following the one returned on the previous iteration.
284
 *      To avoid an extra tuple comparison, we maintain front_id
285
 *      for each source: all sources with front_id equal to the
286
 *      front_id of the read iterator were used on the previous
287
 *      iteration and hence need to be advanced.
288
 *
289
 * 2. Update the candidate for the next key ('next') if the
290
 *    statement at which the source is positioned precedes it.
291
 *    The 'stop' flag is set if older sources do not need to be
292
 *    scanned (e.g. because a chain was found in the cache).
293
 *    See also vy_read_iterator_evaluate_src().
294
 */
295

296
static NODISCARD int
297
vy_read_iterator_scan_txw(struct vy_read_iterator *itr,
3,151,170✔
298
                          struct vy_entry *next, bool *stop)
299
{
300
        struct vy_read_src *src = &itr->src[itr->txw_src];
3,151,170✔
301
        struct vy_txw_iterator *src_itr = &src->txw_iterator;
3,151,170✔
302

303
        if (itr->tx == NULL)
3,151,170✔
304
                return 0;
191,754✔
305

306
        assert(itr->txw_src < itr->skipped_src);
2,959,420!
307

308
        int rc = vy_txw_iterator_restore(src_itr, itr->last, &src->history);
2,959,420✔
309
        if (rc == 0) {
2,959,420✔
310
                if (!src->is_started) {
2,959,250✔
311
                        rc = vy_txw_iterator_skip(src_itr, itr->last,
375,566✔
312
                                                  &src->history);
313
                } else if (src->front_id == itr->prev_front_id) {
2,583,680✔
314
                        rc = vy_txw_iterator_next(src_itr, &src->history);
173,018✔
315
                }
316
                src->is_started = true;
2,959,250✔
317
        }
318
        if (rc < 0)
2,959,420!
319
                return -1;
×
320

321
        vy_read_iterator_evaluate_src(itr, src, next, stop);
2,959,420✔
322
        return 0;
2,959,420✔
323
}
324

325
static NODISCARD int
326
vy_read_iterator_scan_cache(struct vy_read_iterator *itr,
3,145,890✔
327
                            struct vy_entry *next, bool *stop)
328
{
329
        bool is_interval = false;
3,145,890✔
330
        struct vy_read_src *src = &itr->src[itr->cache_src];
3,145,890✔
331
        struct vy_cache_iterator *src_itr = &src->cache_iterator;
3,145,890✔
332

333
        int rc = vy_cache_iterator_restore(src_itr, itr->last,
3,145,890!
334
                                           &src->history, &is_interval);
335
        if (rc == 0) {
3,145,890✔
336
                if (!src->is_started || itr->cache_src >= itr->skipped_src) {
3,091,140!
337
                        rc = vy_cache_iterator_skip(src_itr, itr->last,
373,974!
338
                                                &src->history, &is_interval);
339
                } else if (src->front_id == itr->prev_front_id) {
2,717,170✔
340
                        rc = vy_cache_iterator_next(src_itr, &src->history,
1,274,010!
341
                                                    &is_interval);
342
                }
343
                src->is_started = true;
3,091,140✔
344
        }
345
        if (rc < 0)
3,145,890!
346
                return -1;
3,145,890✔
347

348
        vy_read_iterator_evaluate_src(itr, src, next, stop);
3,145,890!
349
        if (is_interval) {
3,145,890✔
350
                itr->skipped_src = itr->cache_src + 1;
1,451,900✔
351
                *stop = true;
1,451,900✔
352
        }
353
        return 0;
3,145,890✔
354
}
355

356
static NODISCARD int
357
vy_read_iterator_scan_mem(struct vy_read_iterator *itr, uint32_t mem_src,
3,050,470✔
358
                          struct vy_entry *next, bool *stop,
359
                          int64_t *min_skipped_plsn)
360
{
361
        int rc;
362
        struct vy_read_src *src = &itr->src[mem_src];
3,050,470✔
363
        struct vy_mem_iterator *src_itr = &src->mem_iterator;
3,050,470✔
364

365
        assert(mem_src >= itr->mem_src && mem_src < itr->disk_src);
3,050,470!
366

367
        rc = vy_mem_iterator_restore(src_itr, itr->last, &src->history);
3,050,470✔
368
        if (rc == 0) {
3,050,470✔
369
                if (!src->is_started || mem_src >= itr->skipped_src) {
3,024,900✔
370
                        rc = vy_mem_iterator_skip(src_itr, itr->last,
323,916✔
371
                                                  &src->history);
372
                } else if (src->front_id == itr->prev_front_id) {
2,700,990✔
373
                        rc = vy_mem_iterator_next(src_itr, &src->history);
1,112,200✔
374
                }
375
                src->is_started = true;
3,024,900✔
376
        }
377
        if (rc < 0)
3,050,470!
378
                return -1;
×
379
        vy_read_iterator_evaluate_src(itr, src, next, stop);
3,050,470✔
380
        *min_skipped_plsn = MIN(*min_skipped_plsn, src_itr->min_skipped_plsn);
3,050,470✔
381
        return 0;
3,050,470✔
382
}
383

384
static NODISCARD int
385
vy_read_iterator_scan_disk(struct vy_read_iterator *itr, uint32_t disk_src,
636,520✔
386
                           struct vy_entry *next, bool *stop)
387
{
388
        int rc = 0;
636,520✔
389
        struct vy_read_src *src = &itr->src[disk_src];
636,520✔
390
        struct vy_run_iterator *src_itr = &src->run_iterator;
636,520✔
391

392
        assert(disk_src >= itr->disk_src && disk_src < itr->src_count);
636,520!
393

394
        if (!src->is_started || disk_src >= itr->skipped_src)
636,520✔
395
                rc = vy_run_iterator_skip(src_itr, itr->last,
35,479✔
396
                                          &src->history);
397
        else if (src->front_id == itr->prev_front_id)
601,041✔
398
                rc = vy_run_iterator_next(src_itr, &src->history);
234,805✔
399
        src->is_started = true;
636,519✔
400

401
        if (rc < 0)
636,519✔
402
                return -1;
7✔
403

404
        vy_read_iterator_evaluate_src(itr, src, next, stop);
636,512✔
405
        return 0;
636,512✔
406
}
407

408
/**
409
 * Restore the position of the active in-memory tree iterator
410
 * after a yield caused by a disk read and update 'next'
411
 * if necessary.
412
 */
413
static NODISCARD int
414
vy_read_iterator_restore_mem(struct vy_read_iterator *itr,
1,693,750✔
415
                             struct vy_entry *next)
416
{
417
        int rc;
418
        int cmp;
419
        struct vy_read_src *src = &itr->src[itr->mem_src];
1,693,750✔
420

421
        /*
422
         * 'next' may refer to a statement in the memory source history,
423
         * which may be cleaned up by vy_mem_iterator_restore(), so we need
424
         * to take a reference to it.
425
         */
426
        struct tuple *next_stmt_ref = next->stmt;
1,693,750✔
427
        if (next_stmt_ref != NULL)
1,693,750✔
428
                tuple_ref(next_stmt_ref);
1,526,440!
429

430
        rc = vy_mem_iterator_restore(&src->mem_iterator,
1,693,750!
431
                                     itr->last, &src->history);
432
        if (rc < 0)
1,693,750!
433
                goto out; /* memory allocation error */
×
434
        if (rc == 0)
1,693,750✔
435
                goto out; /* nothing changed */
1,692,200✔
436

437
        /* The memory source was updated. Reevaluate it for 'next'. */
438
        rc = 0;
1,549✔
439
        struct vy_entry entry = vy_history_last_stmt(&src->history);
1,549!
440
        cmp = vy_read_iterator_cmp_stmt(itr, entry, *next);
1,549!
441
        if (cmp > 0) {
1,549✔
442
                /*
443
                 * Normally, memory trees are append-only so if the source is
444
                 * not on top of the heap after restoration, it was not before.
445
                 * There's one exception to this rule though: a statement may
446
                 * be deleted from a memory tree on rollback after a WAL write
447
                 * failure. If the deleted statement was on top of the heap,
448
                 * we need to reevaluate all read sources to reposition the
449
                 * iterator to the minimal statement.
450
                 */
451
                if (src->front_id == itr->front_id)
1,234✔
452
                        vy_read_iterator_reevaluate_srcs(itr, next);
1!
453
                goto out;
1,234✔
454
        }
455
        /* The new statement is a better candidate for 'next'. */
456
        *next = entry;
315✔
457
        if (cmp < 0) {
315✔
458
                /*
459
                 * The new statement precedes the current
460
                 * candidate for the next key.
461
                 */
462
                itr->front_id++;
1✔
463
        } else {
464
                /*
465
                 * The new statement updates the next key.
466
                 * Make sure we don't read the old value
467
                 * from the cache while applying UPSERTs.
468
                 */
469
                struct vy_read_src *cache_src = &itr->src[itr->cache_src];
314✔
470
                if (cache_src->front_id == itr->front_id)
314✔
471
                        vy_history_cleanup(&cache_src->history);
23!
472
        }
473
        src->front_id = itr->front_id;
315✔
474
out:
1,693,750✔
475
        if (next_stmt_ref != NULL)
1,693,750✔
476
                tuple_unref(next_stmt_ref);
1,526,440!
477
        return rc;
1,693,750✔
478
}
479

480
static void
481
vy_read_iterator_restore(struct vy_read_iterator *itr);
482

483
static void
484
vy_read_iterator_next_range(struct vy_read_iterator *itr);
485

486
/**
487
 * Advance the iterator to the next key.
488
 * Returns 0 on success, -1 on error.
489
 */
490
static NODISCARD int
491
vy_read_iterator_advance(struct vy_read_iterator *itr)
3,161,780✔
492
{
493
        if (itr->last.stmt != NULL && (itr->iterator_type == ITER_EQ ||
3,161,780✔
494
                                       itr->iterator_type == ITER_REQ) &&
2,599,910✔
495
            vy_stmt_is_full_key(itr->key.stmt, itr->lsm->cmp_def)) {
222,585!
496
                /*
497
                 * There may be one statement at max satisfying
498
                 * EQ with a full key.
499
                 */
500
                itr->front_id++;
10,698✔
501
                return 0;
3,161,780✔
502
        }
503
        /*
504
         * Restore the iterator position if the LSM tree has changed
505
         * since the last iteration or this is the first iteration.
506
         */
507
        if (!itr->is_started ||
3,151,080✔
508
            itr->mem_list_version != itr->lsm->mem_list_version ||
2,772,230✔
509
            itr->range_tree_version != itr->lsm->range_tree_version ||
2,772,040✔
510
            itr->range_version != itr->curr_range->version) {
2,771,990✔
511
                vy_read_iterator_restore(itr);
379,158!
512
        }
513
        itr->is_started = true;
3,151,080✔
514
restart:
3,151,170✔
515
        itr->prev_front_id = itr->front_id;
3,151,170✔
516
        itr->front_id++;
3,151,170✔
517

518
        /*
519
         * Look up the next key in read sources starting
520
         * from the one that stores newest data.
521
         */
522
        bool stop = false;
3,151,170✔
523
        struct vy_entry next = vy_entry_none();
3,151,170!
524
        if (vy_read_iterator_scan_txw(itr, &next, &stop) != 0)
3,151,170!
525
                return -1;
×
526
        if (stop)
3,151,170✔
527
                goto done;
5,283✔
528
        if (vy_read_iterator_scan_cache(itr, &next, &stop) != 0)
3,145,890!
529
                return -1;
×
530
        if (stop)
3,145,890✔
531
                goto done;
1,452,130✔
532

533
        int64_t min_skipped_plsn = INT64_MAX;
1,693,760✔
534
        for (uint32_t i = itr->mem_src; i < itr->disk_src && !stop; i++) {
4,744,230✔
535
                if (vy_read_iterator_scan_mem(itr, i, &next, &stop,
3,050,470!
536
                                              &min_skipped_plsn) != 0)
537
                        return -1;
×
538
        }
539
        if (itr->tx != NULL && min_skipped_plsn != INT64_MAX) {
1,693,760✔
540
                if (vy_tx_send_to_read_view(itr->tx, min_skipped_plsn) != 0)
281,738!
541
                        return -1;
×
542
                if (itr->tx->state == VINYL_TX_ABORT) {
281,738!
543
                        diag_set(ClientError, ER_TRANSACTION_CONFLICT);
×
544
                        return -1;
×
545
                }
546
        }
547
        if (stop)
1,693,760✔
548
                goto done;
3,159✔
549
rescan_disk:
1,690,600✔
550
        /* The following code may yield as it needs to access disk. */
551
        vy_read_iterator_pin_slices(itr);
1,693,850!
552
        for (uint32_t i = itr->disk_src; i < itr->src_count; i++) {
2,328,290✔
553
                if (vy_read_iterator_scan_disk(itr, i, &next, &stop) != 0) {
636,520!
554
                        vy_read_iterator_unpin_slices(itr);
7!
555
                        return -1;
7✔
556
                }
557
                if (stop)
636,512✔
558
                        break;
2,071✔
559
        }
560
        vy_read_iterator_unpin_slices(itr);
1,693,840!
561
        /*
562
         * The transaction could have been aborted while we were
563
         * reading disk. We must stop now and return an error as
564
         * this function could be called by a DML request that
565
         * was aborted by a DDL operation: failing will prevent
566
         * it from dereferencing a destroyed space.
567
         */
568
        if (itr->tx != NULL && itr->tx->state == VINYL_TX_ABORT) {
1,693,840✔
569
                diag_set(ClientError, ER_TRANSACTION_CONFLICT);
1!
570
                return -1;
1✔
571
        }
572
        /*
573
         * The list of in-memory indexes and/or the range tree could
574
         * have been modified by dump/compaction while we were fetching
575
         * data from disk. Restart the iterator if this is the case.
576
         * Note, we don't need to check the current range's version,
577
         * because all slices were pinned and hence could not be
578
         * removed.
579
         */
580
        if (itr->mem_list_version != itr->lsm->mem_list_version ||
1,693,840✔
581
            itr->range_tree_version != itr->lsm->range_tree_version) {
1,693,800✔
582
                vy_read_iterator_restore(itr);
92!
583
                goto restart;
92✔
584
        }
585
        /*
586
         * The transaction write set couldn't change during the yield
587
         * as it is owned exclusively by the current fiber so the only
588
         * source to check is the active in-memory tree.
589
         */
590
        if (vy_read_iterator_restore_mem(itr, &next) != 0)
1,693,750!
591
                return -1;
×
592
        /*
593
         * Scan the next range in case we transgressed the current
594
         * range's boundaries.
595
         */
596
        if (vy_read_iterator_range_is_done(itr, next)) {
1,693,750!
597
                vy_read_iterator_next_range(itr);
3,250!
598
                goto rescan_disk;
3,250✔
599
        }
600
done:
1,690,500✔
601
#ifndef NDEBUG
602
        /* Check that the statement meets search criteria. */
603
        if (next.stmt != NULL) {
3,151,070✔
604
                int cmp = vy_entry_compare(next, itr->key, itr->lsm->cmp_def);
2,835,210!
605
                cmp *= iterator_direction(itr->iterator_type);
2,835,210!
606
                if (itr->iterator_type == ITER_GT ||
2,835,210✔
607
                    itr->iterator_type == ITER_LT)
2,816,860✔
608
                        assert(cmp > 0);
26,077!
609
                else
610
                        assert(cmp >= 0);
2,809,130!
611
        }
612
        /*
613
         * Ensure the read iterator does not return duplicates
614
         * and respects statement order.
615
         */
616
        if (itr->last.stmt != NULL && next.stmt != NULL) {
3,151,070✔
617
               assert(vy_read_iterator_cmp_stmt(itr, next, itr->last) > 0);
2,599,700!
618
        }
619
#endif
620
        if (itr->need_check_eq && next.stmt != NULL &&
3,151,070✔
621
            vy_entry_compare(next, itr->key, itr->lsm->cmp_def) != 0)
44,315!
622
                itr->front_id++;
4,332✔
623
        return 0;
3,151,070✔
624
}
625

626
/** Add the transaction source to the read iterator. */
627
static void
628
vy_read_iterator_add_tx(struct vy_read_iterator *itr)
375,566✔
629
{
630
        assert(itr->tx != NULL);
375,566!
631
        enum iterator_type iterator_type = (itr->iterator_type != ITER_REQ ?
751,132✔
632
                                            itr->iterator_type : ITER_LE);
375,566✔
633
        struct vy_txw_iterator_stat *stat = &itr->lsm->stat.txw.iterator;
375,566✔
634
        struct vy_read_src *sub_src = vy_read_iterator_add_src(itr);
375,566✔
635
        vy_txw_iterator_open(&sub_src->txw_iterator, stat, itr->tx, itr->lsm,
375,566✔
636
                             iterator_type, itr->key);
637
}
375,566✔
638

639
/** Add the cache source to the read iterator. */
640
static void
641
vy_read_iterator_add_cache(struct vy_read_iterator *itr, bool is_prepared_ok)
379,250✔
642
{
643
        enum iterator_type iterator_type = (itr->iterator_type != ITER_REQ ?
758,500✔
644
                                            itr->iterator_type : ITER_LE);
379,250✔
645
        struct vy_read_src *sub_src = vy_read_iterator_add_src(itr);
379,250✔
646
        vy_cache_iterator_open(&sub_src->cache_iterator, &itr->lsm->cache,
379,250✔
647
                               iterator_type, itr->key, itr->read_view,
648
                               is_prepared_ok);
649
}
379,250✔
650

651
/** Add the memory level source to the read iterator. */
652
static void
653
vy_read_iterator_add_mem(struct vy_read_iterator *itr, bool is_prepared_ok)
379,250✔
654
{
655
        enum iterator_type iterator_type = (itr->iterator_type != ITER_REQ ?
758,500✔
656
                                            itr->iterator_type : ITER_LE);
379,250✔
657
        struct vy_lsm *lsm = itr->lsm;
379,250✔
658
        struct vy_read_src *sub_src;
659

660
        /* Add the active in-memory index. */
661
        assert(lsm->mem != NULL);
379,250!
662
        sub_src = vy_read_iterator_add_src(itr);
379,250✔
663
        vy_mem_iterator_open(&sub_src->mem_iterator, &lsm->stat.memory.iterator,
379,250✔
664
                             lsm->mem, iterator_type, itr->key, itr->read_view,
665
                             is_prepared_ok);
666
        /* Add sealed in-memory indexes. */
667
        struct vy_mem *mem;
668
        rlist_foreach_entry(mem, &lsm->sealed, in_sealed) {
1,256,480✔
669
                sub_src = vy_read_iterator_add_src(itr);
248,989✔
670
                vy_mem_iterator_open(&sub_src->mem_iterator,
248,989✔
671
                                     &lsm->stat.memory.iterator,
672
                                     mem, iterator_type, itr->key,
673
                                     itr->read_view, is_prepared_ok);
674
        }
675
}
379,250✔
676

677
/** Add the disk level source to the read iterator. */
678
static void
679
vy_read_iterator_add_disk(struct vy_read_iterator *itr)
382,500✔
680
{
681
        assert(itr->curr_range != NULL);
382,500!
682
        enum iterator_type iterator_type = (itr->iterator_type != ITER_REQ ?
765,000✔
683
                                            itr->iterator_type : ITER_LE);
382,500✔
684
        struct vy_lsm *lsm = itr->lsm;
382,500✔
685
        struct vy_slice *slice;
686
        /*
687
         * The format of the statement must be exactly the space
688
         * format with the same identifier to fully match the
689
         * format in vy_mem.
690
         */
691
        rlist_foreach_entry(slice, &itr->curr_range->slices, in_range) {
1,164,700✔
692
                struct vy_read_src *sub_src = vy_read_iterator_add_src(itr);
199,848✔
693
                vy_run_iterator_open(&sub_src->run_iterator,
199,848✔
694
                                     &lsm->stat.disk.iterator, slice,
695
                                     iterator_type, itr->key,
696
                                     itr->read_view, lsm->cmp_def,
697
                                     lsm->key_def, lsm->disk_format);
698
        }
699
}
382,500✔
700

701
/**
702
 * Close all open sources and reset the merge state.
703
 */
704
static void
705
vy_read_iterator_cleanup(struct vy_read_iterator *itr)
758,106✔
706
{
707
        uint32_t i;
708
        struct vy_read_src *src;
709

710
        if (itr->txw_src < itr->src_count) {
758,106✔
711
                src = &itr->src[itr->txw_src];
375,564✔
712
                vy_history_cleanup(&src->history);
375,564✔
713
                vy_txw_iterator_close(&src->txw_iterator);
375,564✔
714
        }
715
        if (itr->cache_src < itr->src_count) {
758,106✔
716
                src = &itr->src[itr->cache_src];
379,247✔
717
                vy_history_cleanup(&src->history);
379,247✔
718
                vy_cache_iterator_close(&src->cache_iterator);
379,247✔
719
        }
720
        for (i = itr->mem_src; i < itr->disk_src; i++) {
1,386,340✔
721
                src = &itr->src[i];
628,236✔
722
                vy_history_cleanup(&src->history);
628,236✔
723
                vy_mem_iterator_close(&src->mem_iterator);
628,236✔
724
        }
725
        for (i = itr->disk_src; i < itr->src_count; i++) {
956,752✔
726
                src = &itr->src[i];
198,646✔
727
                vy_history_cleanup(&src->history);
198,646✔
728
                vy_run_iterator_close(&src->run_iterator);
198,646✔
729
        }
730

731
        itr->txw_src = UINT32_MAX;
758,106✔
732
        itr->cache_src = UINT32_MAX;
758,106✔
733
        itr->mem_src = UINT32_MAX;
758,106✔
734
        itr->disk_src = UINT32_MAX;
758,106✔
735
        itr->skipped_src = UINT32_MAX;
758,106✔
736
        itr->src_count = 0;
758,106✔
737
}
758,106✔
738

739
void
740
vy_read_iterator_open_after(struct vy_read_iterator *itr, struct vy_lsm *lsm,
378,861✔
741
                            struct vy_tx *tx, enum iterator_type iterator_type,
742
                            struct vy_entry key, struct vy_entry last,
743
                            const struct vy_read_view **rv)
744
{
745
        memset(itr, 0, sizeof(*itr));
378,861✔
746

747
        itr->lsm = lsm;
378,861✔
748
        itr->tx = tx;
378,861✔
749
        itr->iterator_type = iterator_type;
378,861✔
750
        itr->key = key;
378,861✔
751
        itr->read_view = rv;
378,861✔
752
        itr->last = last;
378,861✔
753
        itr->last_cached = vy_entry_none();
378,861✔
754
        itr->is_first_cached = (itr->last.stmt == NULL);
378,861✔
755

756
        if (vy_stmt_is_empty_key(key.stmt)) {
378,861✔
757
                /*
758
                 * Strictly speaking, a GT/LT iterator should return
759
                 * nothing if the key is empty, because every key is
760
                 * equal to the empty key, but historically we return
761
                 * all keys instead. So use GE/LE instead of GT/LT
762
                 * in this case.
763
                 */
764
                itr->iterator_type = iterator_direction(iterator_type) > 0 ?
121,566✔
765
                                     ITER_GE : ITER_LE;
60,783✔
766
        }
767

768
        if (iterator_type == ITER_ALL)
378,861✔
769
                itr->iterator_type = ITER_GE;
3,550✔
770

771
        if (iterator_type == ITER_REQ) {
378,861✔
772
                /*
773
                 * Source iterators cannot handle ITER_REQ and
774
                 * use ITER_LE instead, so we need to enable EQ
775
                 * check in this case.
776
                 *
777
                 * See vy_read_iterator_add_{tx,cache,mem,run}.
778
                 */
779
                itr->need_check_eq = true;
4,955✔
780
        }
781
}
378,861✔
782

783
/**
784
 * Restart the read iterator from the position following
785
 * the last statement returned to the user. Called when
786
 * the current range or the whole range tree is changed.
787
 * Also used for preparing the iterator for the first
788
 * iteration.
789
 */
790
static void
791
vy_read_iterator_restore(struct vy_read_iterator *itr)
379,250✔
792
{
793
        vy_read_iterator_cleanup(itr);
379,250✔
794

795
        itr->mem_list_version = itr->lsm->mem_list_version;
379,250✔
796
        itr->range_tree_version = itr->lsm->range_tree_version;
379,250✔
797
        itr->curr_range = vy_range_tree_find_by_key(&itr->lsm->range_tree,
379,250✔
798
                                                    itr->iterator_type,
799
                                                    itr->last.stmt != NULL ?
379,250✔
800
                                                    itr->last : itr->key);
801
        itr->range_version = itr->curr_range->version;
379,250✔
802

803
        bool is_prepared_ok = true;
379,250✔
804
        if (itr->tx != NULL) {
379,250✔
805
                is_prepared_ok = vy_tx_is_prepared_ok(itr->tx);
375,566✔
806
                itr->txw_src = itr->src_count;
375,566✔
807
                vy_read_iterator_add_tx(itr);
375,566✔
808
        }
809

810
        itr->cache_src = itr->src_count;
379,250✔
811
        vy_read_iterator_add_cache(itr, is_prepared_ok);
379,250✔
812

813
        itr->mem_src = itr->src_count;
379,250✔
814
        vy_read_iterator_add_mem(itr, is_prepared_ok);
379,250✔
815

816
        itr->disk_src = itr->src_count;
379,250✔
817
        vy_read_iterator_add_disk(itr);
379,250✔
818
}
379,250✔
819

820
/**
821
 * Iterate to the next range.
822
 */
823
static void
824
vy_read_iterator_next_range(struct vy_read_iterator *itr)
3,250✔
825
{
826
        struct vy_range *range = itr->curr_range;
3,250✔
827
        struct key_def *cmp_def = itr->lsm->cmp_def;
3,250✔
828
        int dir = iterator_direction(itr->iterator_type);
3,250✔
829

830
        assert(range != NULL);
3,250!
831
        while (true) {
832
                range = dir > 0 ?
3,250✔
833
                        vy_range_tree_next(&itr->lsm->range_tree, range) :
3,250!
834
                        vy_range_tree_prev(&itr->lsm->range_tree, range);
×
835
                assert(range != NULL);
3,250!
836

837
                if (itr->last.stmt == NULL)
3,250✔
838
                        break;
3,113✔
839
                /*
840
                 * We could skip an entire range due to the cache.
841
                 * Make sure the next statement falls in the range.
842
                 */
843
                if (dir > 0 && (range->end.stmt == NULL ||
137!
844
                                vy_entry_compare(itr->last, range->end,
106!
845
                                                 cmp_def) < 0))
846
                        break;
847
                if (dir < 0 && (range->begin.stmt == NULL ||
×
848
                                vy_entry_compare(itr->last, range->begin,
×
849
                                                 cmp_def) > 0))
850
                        break;
851
        }
852
        itr->curr_range = range;
3,250✔
853
        itr->range_version = range->version;
3,250✔
854

855
        for (uint32_t i = itr->disk_src; i < itr->src_count; i++) {
4,449✔
856
                struct vy_read_src *src = &itr->src[i];
1,199✔
857
                vy_run_iterator_close(&src->run_iterator);
1,199✔
858
        }
859
        itr->src_count = itr->disk_src;
3,250✔
860

861
        vy_read_iterator_add_disk(itr);
3,250✔
862
}
3,250✔
863

864
/**
865
 * Get a resultant statement for the current key.
866
 * Returns 0 on success, -1 on error.
867
 */
868
static NODISCARD int
869
vy_read_iterator_apply_history(struct vy_read_iterator *itr,
3,161,770✔
870
                               struct vy_entry *ret)
871
{
872
        struct vy_lsm *lsm = itr->lsm;
3,161,770✔
873
        struct vy_history history;
874
        vy_history_create(&history, &lsm->env->history_node_pool);
3,161,770!
875

876
        for (uint32_t i = 0; i < itr->src_count; i++) {
8,913,300✔
877
                struct vy_read_src *src = &itr->src[i];
8,561,040✔
878
                if (src->front_id == itr->front_id) {
8,561,040✔
879
                        vy_history_splice(&history, &src->history);
3,772,550!
880
                        if (vy_history_is_terminal(&history))
3,772,550!
881
                                break;
2,809,510✔
882
                }
883
        }
884

885
        int upserts_applied = 0;
3,161,770✔
886
        int rc = vy_history_apply(&history, lsm->cmp_def,
3,161,770!
887
                                  true, &upserts_applied, ret);
888

889
        lsm->stat.upsert.applied += upserts_applied;
3,161,770✔
890
        vy_history_cleanup(&history);
3,161,770!
891
        return rc;
3,161,770✔
892
}
893

894
/**
895
 * Track a read in the conflict manager.
896
 */
897
static int
898
vy_read_iterator_track_read(struct vy_read_iterator *itr, struct vy_entry entry)
3,161,770✔
899
{
900
        if (itr->tx == NULL)
3,161,770✔
901
                return 0;
191,746✔
902

903
        if (entry.stmt == NULL) {
2,970,020✔
904
                entry = (itr->iterator_type == ITER_EQ ||
272,700✔
905
                         itr->iterator_type == ITER_REQ ?
59,031✔
906
                         itr->key : itr->lsm->env->empty_key);
381,942✔
907
        }
908

909
        int rc;
910
        if (iterator_direction(itr->iterator_type) >= 0) {
2,970,020✔
911
                rc = vy_tx_track(itr->tx, itr->lsm, itr->key,
2,550,850✔
912
                                 itr->iterator_type != ITER_GT,
2,550,850✔
913
                                 entry, true);
914
        } else {
915
                rc = vy_tx_track(itr->tx, itr->lsm, entry, true,
419,177✔
916
                                 itr->key, itr->iterator_type != ITER_LT);
419,177✔
917
        }
918
        return rc;
2,970,020✔
919
}
920

921
NODISCARD int
922
vy_read_iterator_next(struct vy_read_iterator *itr, struct vy_entry *result)
2,743,760✔
923
{
924
        assert(itr->tx == NULL || itr->tx->state == VINYL_TX_READY);
2,743,760!
925

926
        struct vy_entry entry;
927
next_key:
2,743,760✔
928
        if (vy_read_iterator_advance(itr) != 0)
3,161,780!
929
                return -1;
2,743,760✔
930
        if (vy_read_iterator_apply_history(itr, &entry) != 0)
3,161,770!
931
                return -1;
×
932
        if (vy_read_iterator_track_read(itr, entry) != 0)
3,161,770!
933
                return -1;
×
934

935
        if (itr->last.stmt != NULL)
3,161,770✔
936
                tuple_unref(itr->last.stmt);
2,783,190!
937
        itr->last = entry;
3,161,770✔
938

939
        if (entry.stmt != NULL && vy_stmt_type(entry.stmt) == IPROTO_DELETE) {
3,161,770!
940
                /*
941
                 * We don't return DELETEs so skip to the next key.
942
                 * If the DELETE was read from TX write set, there
943
                 * is a good chance that the space actually has
944
                 * the deleted key and hence we must not consider
945
                 * previous + current tuple as an unbroken chain.
946
                 */
947
                if (vy_stmt_lsn(entry.stmt) == INT64_MAX) {
418,022!
948
                        if (itr->last_cached.stmt != NULL)
3,209✔
949
                                tuple_unref(itr->last_cached.stmt);
8!
950
                        itr->last_cached = vy_entry_none();
3,209!
951
                }
952
                goto next_key;
418,022✔
953
        }
954
        assert(entry.stmt == NULL ||
2,743,750!
955
               vy_stmt_type(entry.stmt) == IPROTO_INSERT ||
956
               vy_stmt_type(entry.stmt) == IPROTO_REPLACE);
957

958
        *result = entry;
2,743,750✔
959
        return 0;
2,743,750✔
960
}
961

962
void
963
vy_read_iterator_cache_add(struct vy_read_iterator *itr, struct vy_entry entry)
2,554,570✔
964
{
965
        if ((**itr->read_view).vlsn != INT64_MAX) {
2,554,570✔
966
                if (itr->last_cached.stmt != NULL)
117,071✔
967
                        tuple_unref(itr->last_cached.stmt);
1,935✔
968
                itr->last_cached = vy_entry_none();
117,071✔
969
                return;
117,071✔
970
        }
971
        vy_cache_add(&itr->lsm->cache, entry, itr->last_cached,
2,437,500✔
972
                     itr->is_first_cached, itr->key, itr->iterator_type);
2,437,500✔
973
        if (entry.stmt != NULL)
2,437,500✔
974
                tuple_ref(entry.stmt);
2,113,940✔
975
        if (itr->last_cached.stmt != NULL)
2,437,500✔
976
                tuple_unref(itr->last_cached.stmt);
2,064,220✔
977
        itr->last_cached = entry;
2,437,500✔
978
        itr->is_first_cached = false;
2,437,500✔
979
}
980

981
/**
982
 * Close the iterator and free resources
983
 */
984
void
985
vy_read_iterator_close(struct vy_read_iterator *itr)
378,856✔
986
{
987
        if (itr->last.stmt != NULL)
378,856✔
988
                tuple_unref(itr->last.stmt);
47,948✔
989
        if (itr->last_cached.stmt != NULL)
378,856✔
990
                tuple_unref(itr->last_cached.stmt);
47,781✔
991
        vy_read_iterator_cleanup(itr);
378,856✔
992
        free(itr->src);
378,856✔
993
        TRASH(itr);
378,856✔
994
}
378,856✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc