• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / #10852

pending completion
#10852

push

travis-ci

Gerold103
lua: implement json path access to tuple fields

94 of 94 new or added lines in 3 files covered. (100.0%)

38272 of 46993 relevant lines covered (81.44%)

846550.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.75
/src/box/vy_stmt.c
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31

32
#include "vy_stmt.h"
33

34
#include <stdlib.h>
35
#include <string.h>
36
#include <sys/uio.h> /* struct iovec */
37
#include <pmatomic.h> /* for refs */
38

39
#include "diag.h"
40
#include <small/region.h>
41
#include <small/lsregion.h>
42

43
#include "error.h"
44
#include "tuple_format.h"
45
#include "xrow.h"
46
#include "fiber.h"
47

48
struct tuple_format_vtab vy_tuple_format_vtab = {
49
        vy_tuple_delete,
50
};
51

52
size_t vy_max_tuple_size = 1024 * 1024;
53

54
void
55
vy_tuple_delete(struct tuple_format *format, struct tuple *tuple)
5,089,807✔
56
{
57
        say_debug("%s(%p)", __func__, tuple);
5,089,807✔
58
        assert(tuple->refs == 0);
5,089,817✔
59
        /*
60
         * Turn off formats referencing in worker threads to avoid
61
         * multithread unsafe modifications of a reference
62
         * counter.
63
         */
64
        if (cord_is_main())
5,089,817✔
65
                tuple_format_unref(format);
4,871,653✔
66
#ifndef NDEBUG
67
        memset(tuple, '#', tuple_size(tuple)); /* fail early */
5,089,815✔
68
#endif
69
        free(tuple);
5,089,815✔
70
}
5,089,815✔
71

72
/**
73
 * Allocate a vinyl statement object on base of the struct tuple
74
 * with malloc() and the reference counter equal to 1.
75
 * @param format Format of an index.
76
 * @param size   Size of the variable part of the statement. It
77
 *               includes size of MessagePack tuple data and, for
78
 *               upserts, MessagePack array of operations.
79
 * @retval not NULL Success.
80
 * @retval     NULL Memory error.
81
 */
82
static struct tuple *
83
vy_stmt_alloc(struct tuple_format *format, uint32_t bsize)
5,184,653✔
84
{
85
        uint32_t meta_size = tuple_format_meta_size(format);
5,184,653✔
86
        uint32_t total_size = sizeof(struct vy_stmt) + meta_size + bsize;
5,184,680✔
87
        if (unlikely(total_size > vy_max_tuple_size)) {
5,184,680✔
88
                diag_set(ClientError, ER_VINYL_MAX_TUPLE_SIZE,
2✔
89
                         (unsigned) total_size);
90
                error_log(diag_last_error(diag_get()));
2✔
91
                return NULL;
2✔
92
        }
93
        struct tuple *tuple = malloc(total_size);
5,184,678✔
94
        if (unlikely(tuple == NULL)) {
5,184,678✔
95
                diag_set(OutOfMemory, total_size, "malloc", "struct vy_stmt");
×
96
                return NULL;
×
97
        }
98
        say_debug("vy_stmt_alloc(format = %d %u, bsize = %zu) = %p",
5,184,678✔
99
                format->id, tuple_format_meta_size(format), bsize, tuple);
100
        tuple->refs = 1;
5,184,690✔
101
        tuple->format_id = tuple_format_id(format);
5,184,690✔
102
        if (cord_is_main())
5,184,706✔
103
                tuple_format_ref(format);
4,966,488✔
104
        tuple->bsize = bsize;
5,184,708✔
105
        tuple->data_offset = sizeof(struct vy_stmt) + meta_size;;
5,184,708✔
106
        vy_stmt_set_lsn(tuple, 0);
5,184,708✔
107
        vy_stmt_set_type(tuple, 0);
5,184,705✔
108
        return tuple;
5,184,701✔
109
}
110

111
struct tuple *
112
vy_stmt_dup(const struct tuple *stmt)
3,545,806✔
113
{
114
        /*
115
         * We don't use tuple_new() to avoid the initializing of
116
         * tuple field map. This map can be simple memcopied from
117
         * the original tuple.
118
         */
119
        struct tuple *res = vy_stmt_alloc(tuple_format(stmt), stmt->bsize);
3,545,806✔
120
        if (res == NULL)
3,545,806✔
121
                return NULL;
×
122
        assert(tuple_size(res) == tuple_size(stmt));
3,545,806✔
123
        assert(res->data_offset == stmt->data_offset);
3,545,806✔
124
        memcpy(res, stmt, tuple_size(stmt));
3,545,806✔
125
        res->refs = 1;
3,545,806✔
126
        return res;
3,545,806✔
127
}
128

129
struct tuple *
130
vy_stmt_dup_lsregion(const struct tuple *stmt, struct lsregion *lsregion,
600,705✔
131
                     int64_t alloc_id)
132
{
133
        enum iproto_type type = vy_stmt_type(stmt);
600,705✔
134
        size_t size = tuple_size(stmt);
600,705✔
135
        size_t alloc_size = size;
600,705✔
136
        struct tuple *mem_stmt;
137

138
        /* Reserve one byte for UPSERT counter. */
139
        if (type == IPROTO_UPSERT)
600,705✔
140
                alloc_size++;
35,245✔
141

142
        mem_stmt = lsregion_alloc(lsregion, alloc_size, alloc_id);
600,705✔
143
        if (mem_stmt == NULL) {
600,705✔
144
                diag_set(OutOfMemory, size, "lsregion_alloc", "mem_stmt");
×
145
                return NULL;
×
146
        }
147

148
        if (type == IPROTO_UPSERT) {
600,705✔
149
                *(uint8_t *)mem_stmt = 0;
35,245✔
150
                mem_stmt = (struct tuple *)((uint8_t *)mem_stmt + 1);
35,245✔
151
        }
152

153
        memcpy(mem_stmt, stmt, size);
600,705✔
154
        /*
155
         * Region allocated statements can't be referenced or unreferenced
156
         * because they are located in monolithic memory region. Referencing has
157
         * sense only for separately allocated memory blocks.
158
         * The reference count here is set to 0 for an assertion if somebody
159
         * will try to unreference this statement.
160
         */
161
        mem_stmt->refs = 0;
600,705✔
162
        return mem_stmt;
600,705✔
163
}
164

165
/**
166
 * Create the key statement from raw MessagePack data.
167
 * @param format     Format of an index.
168
 * @param key        MessagePack data that contain an array of
169
 *                   fields WITHOUT the array header.
170
 * @param part_count Count of the key fields that will be saved as
171
 *                   result.
172
 *
173
 * @retval not NULL Success.
174
 * @retval     NULL Memory allocation error.
175
 */
176
struct tuple *
177
vy_stmt_new_select(struct tuple_format *format, const char *key,
242,643✔
178
                   uint32_t part_count)
179
{
180
        assert(part_count == 0 || key != NULL);
242,643✔
181
        /* Key don't have field map */
182
        assert(format->field_map_size == 0);
242,643✔
183

184
        /* Calculate key length */
185
        const char *key_end = key;
242,643✔
186
        for (uint32_t i = 0; i < part_count; i++)
522,316✔
187
                mp_next(&key_end);
279,673✔
188

189
        /* Allocate stmt */
190
        uint32_t key_size = key_end - key;
242,643✔
191
        uint32_t bsize = mp_sizeof_array(part_count) + key_size;
242,643✔
192
        struct tuple *stmt = vy_stmt_alloc(format, bsize);
242,643✔
193
        if (stmt == NULL)
242,643✔
194
                return NULL;
242,643✔
195
        /* Copy MsgPack data */
196
        char *raw = (char *) stmt + sizeof(struct vy_stmt);
242,643✔
197
        char *data = mp_encode_array(raw, part_count);
242,643✔
198
        memcpy(data, key, key_size);
242,643✔
199
        assert(data + key_size == raw + bsize);
242,643✔
200
        vy_stmt_set_type(stmt, IPROTO_SELECT);
242,643✔
201
        return stmt;
242,643✔
202
}
203

204
char *
205
vy_key_dup(const char *key)
50,576✔
206
{
207
        assert(mp_typeof(*key) == MP_ARRAY);
101,152✔
208
        const char *end = key;
50,576✔
209
        mp_next(&end);
50,576✔
210
        char *res = malloc(end - key);
50,668✔
211
        if (res == NULL) {
50,668✔
212
                diag_set(OutOfMemory, end - key, "malloc", "key");
×
213
                return NULL;
50,684✔
214
        }
215
        memcpy(res, key, end - key);
50,668✔
216
        return res;
50,668✔
217
}
218

219
/**
220
 * Create a statement without type and with reserved space for operations.
221
 * Operations can be saved in the space available by @param extra.
222
 * For details @sa struct vy_stmt comment.
223
 */
224
static struct tuple *
225
vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
819,720✔
226
                     const char *tuple_end, struct iovec *ops,
227
                     int op_count, enum iproto_type type)
228
{
229
        mp_tuple_assert(tuple_begin, tuple_end);
819,720✔
230

231
        const char *tmp = tuple_begin;
819,814✔
232
        uint32_t field_count = mp_decode_array(&tmp);
819,784✔
233
        assert(field_count >= format->min_field_count);
819,784✔
234
        (void) field_count;
235

236
        size_t ops_size = 0;
819,784✔
237
        for (int i = 0; i < op_count; ++i)
873,189✔
238
                ops_size += ops[i].iov_len;
53,405✔
239

240
        /*
241
         * Allocate stmt. Offsets: one per key part + offset of the
242
         * statement end.
243
         */
244
        size_t mpsize = (tuple_end - tuple_begin);
819,784✔
245
        size_t bsize = mpsize + ops_size;
819,784✔
246
        struct tuple *stmt = vy_stmt_alloc(format, bsize);
819,784✔
247
        if (stmt == NULL)
819,848✔
248
                return NULL;
819,810✔
249
        /* Copy MsgPack data */
250
        char *raw = (char *) tuple_data(stmt);
819,807✔
251
        char *wpos = raw;
819,780✔
252
        memcpy(wpos, tuple_begin, mpsize);
819,780✔
253
        wpos += mpsize;
819,780✔
254
        for (struct iovec *op = ops, *end = ops + op_count;
1,692,966✔
255
             op != end; ++op) {
53,406✔
256
                memcpy(wpos, op->iov_base, op->iov_len);
53,406✔
257
                wpos += op->iov_len;
53,406✔
258
        }
259
        vy_stmt_set_type(stmt, type);
819,780✔
260

261
        /* Calculate offsets for key parts */
262
        if (tuple_init_field_map(format, (uint32_t *) raw, raw)) {
819,788✔
263
                tuple_unref(stmt);
×
264
                return NULL;
×
265
        }
266
        return stmt;
819,728✔
267
}
268

269
struct tuple *
270
vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin,
53,396✔
271
                   const char *tuple_end, struct iovec *operations,
272
                   uint32_t ops_cnt)
273
{
274
        return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
53,396✔
275
                                    operations, ops_cnt, IPROTO_UPSERT);
276
}
277

278
struct tuple *
279
vy_stmt_new_replace(struct tuple_format *format, const char *tuple_begin,
152,576✔
280
                    const char *tuple_end)
281
{
282
        return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
152,576✔
283
                                    NULL, 0, IPROTO_REPLACE);
284
}
285

286
struct tuple *
287
vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin,
258,730✔
288
                   const char *tuple_end)
289
{
290
        return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
258,730✔
291
                                    NULL, 0, IPROTO_INSERT);
292
}
293

294
struct tuple *
295
vy_stmt_replace_from_upsert(const struct tuple *upsert)
86,788✔
296
{
297
        assert(vy_stmt_type(upsert) == IPROTO_UPSERT);
86,788✔
298
        /* Get statement size without UPSERT operations */
299
        uint32_t bsize;
300
        vy_upsert_data_range(upsert, &bsize);
86,788✔
301
        assert(bsize <= upsert->bsize);
86,788✔
302

303
        /* Copy statement data excluding UPSERT operations */
304
        struct tuple_format *format = tuple_format(upsert);
86,788✔
305
        struct tuple *replace = vy_stmt_alloc(format, bsize);
86,788✔
306
        if (replace == NULL)
86,788✔
307
                return NULL;
86,788✔
308
        /* Copy both data and field_map. */
309
        char *dst = (char *)replace + sizeof(struct vy_stmt);
86,788✔
310
        char *src = (char *)upsert + sizeof(struct vy_stmt);
86,788✔
311
        memcpy(dst, src, format->field_map_size + bsize);
86,788✔
312
        vy_stmt_set_type(replace, IPROTO_REPLACE);
86,788✔
313
        vy_stmt_set_lsn(replace, vy_stmt_lsn(upsert));
86,788✔
314
        return replace;
86,788✔
315
}
316

317
static struct tuple *
318
vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
471,019✔
319
                               const struct key_def *cmp_def,
320
                               struct tuple_format *format)
321
{
322
        /* UPSERT can't be surrogate. */
323
        assert(type != IPROTO_UPSERT);
471,019✔
324
        struct region *region = &fiber()->gc;
471,019✔
325

326
        uint32_t field_count = format->index_field_count;
471,019✔
327
        struct iovec *iov = region_alloc(region, sizeof(*iov) * field_count);
471,019✔
328
        if (iov == NULL) {
471,023✔
329
                diag_set(OutOfMemory, sizeof(*iov) * field_count,
×
330
                         "region", "iov for surrogate key");
331
                return NULL;
×
332
        }
333
        memset(iov, 0, sizeof(*iov) * field_count);
471,023✔
334
        uint32_t part_count = mp_decode_array(&key);
471,020✔
335
        assert(part_count == cmp_def->part_count);
471,020✔
336
        assert(part_count <= field_count);
471,020✔
337
        uint32_t nulls_count = field_count - cmp_def->part_count;
471,020✔
338
        uint32_t bsize = mp_sizeof_array(field_count) +
942,041✔
339
                         mp_sizeof_nil() * nulls_count;
471,020✔
340
        for (uint32_t i = 0; i < part_count; ++i) {
1,344,014✔
341
                const struct key_part *part = &cmp_def->parts[i];
872,989✔
342
                assert(part->fieldno < field_count);
872,989✔
343
                const char *svp = key;
872,990✔
344
                iov[part->fieldno].iov_base = (char *) key;
872,990✔
345
                mp_next(&key);
872,990✔
346
                iov[part->fieldno].iov_len = key - svp;
872,993✔
347
                bsize += key - svp;
872,993✔
348
        }
349

350
        struct tuple *stmt = vy_stmt_alloc(format, bsize);
471,025✔
351
        if (stmt == NULL)
471,025✔
352
                return NULL;
×
353

354
        char *raw = (char *) tuple_data(stmt);
471,025✔
355
        uint32_t *field_map = (uint32_t *) raw;
471,025✔
356
        char *wpos = mp_encode_array(raw, field_count);
471,025✔
357
        for (uint32_t i = 0; i < field_count; ++i) {
1,517,824✔
358
                const struct tuple_field *field = &format->fields[i];
1,046,800✔
359
                if (field->offset_slot != TUPLE_OFFSET_SLOT_NIL)
1,046,800✔
360
                        field_map[field->offset_slot] = wpos - raw;
408,933✔
361
                if (iov[i].iov_base == NULL) {
1,046,800✔
362
                        wpos = mp_encode_nil(wpos);
173,812✔
363
                } else {
364
                        memcpy(wpos, iov[i].iov_base, iov[i].iov_len);
872,988✔
365
                        wpos += iov[i].iov_len;
872,988✔
366
                }
367
        }
368
        assert(wpos == raw + bsize);
471,024✔
369
        vy_stmt_set_type(stmt, type);
471,024✔
370
        return stmt;
471,022✔
371
}
372

373
struct tuple *
374
vy_stmt_new_surrogate_delete_from_key(const char *key,
174,181✔
375
                                      const struct key_def *cmp_def,
376
                                      struct tuple_format *format)
377
{
378
        return vy_stmt_new_surrogate_from_key(key, IPROTO_DELETE,
174,181✔
379
                                              cmp_def, format);
380
}
381

382
struct tuple *
383
vy_stmt_new_surrogate_delete(struct tuple_format *format,
18,643✔
384
                             const struct tuple *src)
385
{
386
        uint32_t src_size;
387
        const char *src_data = tuple_data_range(src, &src_size);
18,643✔
388
        uint32_t total_size = src_size + format->field_map_size;
18,643✔
389
        /* Surrogate tuple uses less memory than the original tuple */
390
        char *data = region_alloc(&fiber()->gc, total_size);
18,643✔
391
        if (data == NULL) {
18,643✔
392
                diag_set(OutOfMemory, src_size, "region", "tuple");
×
393
                return NULL;
18,643✔
394
        }
395
        char *field_map_begin = data + src_size;
18,643✔
396
        uint32_t *field_map = (uint32_t *) (data + total_size);
18,643✔
397

398
        const char *src_pos = src_data;
18,643✔
399
        uint32_t src_count = mp_decode_array(&src_pos);
18,643✔
400
        assert(src_count >= format->min_field_count);
18,643✔
401
        uint32_t field_count;
402
        if (src_count < format->index_field_count) {
18,643✔
403
                field_count = src_count;
2✔
404
                /*
405
                 * Nullify field map to be able to detect by 0,
406
                 * which key fields are absent in tuple_field().
407
                 */
408
                memset((char *)field_map - format->field_map_size, 0,
2✔
409
                       format->field_map_size);
2✔
410
        } else {
411
                field_count = format->index_field_count;
18,641✔
412
        }
413
        char *pos = mp_encode_array(data, field_count);
18,643✔
414
        for (uint32_t i = 0; i < field_count; ++i) {
70,115✔
415
                const struct tuple_field *field = &format->fields[i];
51,472✔
416
                if (! field->is_key_part) {
51,472✔
417
                        /* Unindexed field - write NIL. */
418
                        assert(i < src_count);
41✔
419
                        pos = mp_encode_nil(pos);
41✔
420
                        mp_next(&src_pos);
41✔
421
                        continue;
41✔
422
                }
423
                /* Indexed field - copy */
424
                const char *src_field = src_pos;
51,431✔
425
                mp_next(&src_pos);
51,431✔
426
                memcpy(pos, src_field, src_pos - src_field);
51,431✔
427
                if (field->offset_slot != TUPLE_OFFSET_SLOT_NIL)
51,431✔
428
                        field_map[field->offset_slot] = pos - data;
32,786✔
429
                pos += src_pos - src_field;
51,431✔
430
        }
431
        assert(pos <= data + src_size);
18,643✔
432
        uint32_t bsize = pos - data;
18,643✔
433
        struct tuple *stmt = vy_stmt_alloc(format, bsize);
18,643✔
434
        if (stmt == NULL)
18,643✔
435
                return NULL;
×
436
        char *stmt_data = (char *) tuple_data(stmt);
18,643✔
437
        char *stmt_field_map_begin = stmt_data - format->field_map_size;
18,643✔
438
        memcpy(stmt_data, data, bsize);
18,643✔
439
        memcpy(stmt_field_map_begin, field_map_begin, format->field_map_size);
18,643✔
440
        vy_stmt_set_type(stmt, IPROTO_DELETE);
18,643✔
441

442
        return stmt;
18,643✔
443
}
444

445
struct tuple *
446
vy_stmt_extract_key(const struct tuple *stmt, const struct key_def *key_def,
24,309✔
447
                    struct tuple_format *format)
448
{
449
        struct region *region = &fiber()->gc;
24,309✔
450
        size_t region_svp = region_used(region);
24,309✔
451
        const char *key_raw = tuple_extract_key(stmt, key_def, NULL);
24,309✔
452
        if (key_raw == NULL)
24,309✔
453
                return NULL;
24,309✔
454
        uint32_t part_count = mp_decode_array(&key_raw);
24,309✔
455
        assert(part_count == key_def->part_count);
24,309✔
456
        struct tuple *key = vy_stmt_new_select(format, key_raw, part_count);
24,309✔
457
        /* Cleanup memory allocated by tuple_extract_key(). */
458
        region_truncate(region, region_svp);
24,309✔
459
        return key;
24,309✔
460
}
461

462
struct tuple *
463
vy_stmt_extract_key_raw(const char *data, const char *data_end,
128✔
464
                        const struct key_def *key_def,
465
                        struct tuple_format *format)
466
{
467
        struct region *region = &fiber()->gc;
128✔
468
        size_t region_svp = region_used(region);
128✔
469
        const char *key_raw = tuple_extract_key_raw(data, data_end,
128✔
470
                                                    key_def, NULL);
471
        if (key_raw == NULL)
128✔
472
                return NULL;
128✔
473
        uint32_t part_count = mp_decode_array(&key_raw);
128✔
474
        assert(part_count == key_def->part_count);
128✔
475
        struct tuple *key = vy_stmt_new_select(format, key_raw, part_count);
128✔
476
        /* Cleanup memory allocated by tuple_extract_key_raw(). */
477
        region_truncate(region, region_svp);
128✔
478
        return key;
128✔
479
}
480

481
int
482
vy_stmt_encode_primary(const struct tuple *value,
211,410✔
483
                       const struct key_def *key_def, uint32_t space_id,
484
                       struct xrow_header *xrow)
485
{
486
        memset(xrow, 0, sizeof(*xrow));
211,410✔
487
        enum iproto_type type = vy_stmt_type(value);
211,410✔
488
        xrow->type = type;
211,403✔
489
        xrow->lsn = vy_stmt_lsn(value);
211,403✔
490

491
        struct request request;
492
        memset(&request, 0, sizeof(request));
211,334✔
493
        request.type = type;
211,334✔
494
        request.space_id = space_id;
211,334✔
495
        uint32_t size;
496
        const char *extracted = NULL;
211,334✔
497
        switch (type) {
211,334✔
498
        case IPROTO_DELETE:
499
                /* extract key */
500
                extracted = tuple_extract_key(value, key_def, &size);
26,776✔
501
                if (extracted == NULL)
26,811✔
502
                        return -1;
211,415✔
503
                request.key = extracted;
26,776✔
504
                request.key_end = request.key + size;
26,776✔
505
                break;
26,776✔
506
        case IPROTO_INSERT:
507
        case IPROTO_REPLACE:
508
                request.tuple = tuple_data_range(value, &size);
178,141✔
509
                request.tuple_end = request.tuple + size;
178,202✔
510
                break;
178,202✔
511
        case IPROTO_UPSERT:
512
                request.tuple = vy_upsert_data_range(value, &size);
6,417✔
513
                request.tuple_end = request.tuple + size;
6,417✔
514
                /* extract operations */
515
                request.ops = vy_stmt_upsert_ops(value, &size);
6,417✔
516
                request.ops_end = request.ops + size;
6,417✔
517
                break;
6,417✔
518
        default:
519
                unreachable();
×
520
        }
521
        xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
211,395✔
522
        if (xrow->bodycnt < 0)
211,345✔
523
                return -1;
×
524
        return 0;
211,345✔
525
}
526

527
int
528
vy_stmt_encode_secondary(const struct tuple *value,
108,588✔
529
                         const struct key_def *cmp_def,
530
                         struct xrow_header *xrow)
531
{
532
        memset(xrow, 0, sizeof(*xrow));
108,588✔
533
        enum iproto_type type = vy_stmt_type(value);
108,588✔
534
        xrow->type = type;
108,596✔
535
        xrow->lsn = vy_stmt_lsn(value);
108,596✔
536

537
        struct request request;
538
        memset(&request, 0, sizeof(request));
108,607✔
539
        request.type = type;
108,607✔
540
        uint32_t size;
541
        const char *extracted = tuple_extract_key(value, cmp_def, &size);
108,607✔
542
        if (extracted == NULL)
108,627✔
543
                return -1;
108,571✔
544
        if (type == IPROTO_REPLACE || type == IPROTO_INSERT) {
108,612✔
545
                request.tuple = extracted;
105,972✔
546
                request.tuple_end = extracted + size;
105,972✔
547
        } else {
548
                assert(type == IPROTO_DELETE);
2,640✔
549
                request.key = extracted;
2,640✔
550
                request.key_end = extracted + size;
2,640✔
551
        }
552
        xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
108,612✔
553
        if (xrow->bodycnt < 0)
108,541✔
554
                return -1;
×
555
        else
556
                return 0;
108,541✔
557
}
558

559
struct tuple *
560
vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
668,182✔
561
               struct tuple_format *format, bool is_primary)
562
{
563
        struct request request;
564
        uint64_t key_map = dml_request_key_map(xrow->type);
668,182✔
565
        key_map &= ~(1ULL << IPROTO_SPACE_ID); /* space_id is optional */
668,177✔
566
        if (xrow_decode_dml(xrow, &request, key_map) != 0)
668,177✔
567
                return NULL;
668,136✔
568
        struct tuple *stmt = NULL;
668,146✔
569
        const char *key;
570
        (void) key;
571
        struct iovec ops;
572
        switch (request.type) {
668,146✔
573
        case IPROTO_DELETE:
574
                /* extract key */
575
                stmt = vy_stmt_new_surrogate_from_key(request.key,
56,566✔
576
                                                      IPROTO_DELETE,
577
                                                      key_def, format);
578
                break;
56,479✔
579
        case IPROTO_INSERT:
580
        case IPROTO_REPLACE:
581
                if (is_primary) {
595,324✔
582
                        stmt = vy_stmt_new_with_ops(format, request.tuple,
355,049✔
583
                                                    request.tuple_end,
584
                                                    NULL, 0, request.type);
355,049✔
585
                } else {
586
                        stmt = vy_stmt_new_surrogate_from_key(request.tuple,
240,275✔
587
                                                              request.type,
240,275✔
588
                                                              key_def, format);
589
                }
590
                break;
595,366✔
591
        case IPROTO_UPSERT:
592
                ops.iov_base = (char *)request.ops;
16,256✔
593
                ops.iov_len = request.ops_end - request.ops;
16,256✔
594
                stmt = vy_stmt_new_upsert(format, request.tuple,
16,256✔
595
                                          request.tuple_end, &ops, 1);
596
                break;
16,256✔
597
        default:
598
                /* TODO: report filename. */
599
                diag_set(ClientError, ER_INVALID_RUN_FILE,
×
600
                         tt_sprintf("Can't decode statement: "
601
                                    "unknown request type %u",
602
                                    (unsigned)request.type));
603
                return NULL;
×
604
        }
605

606
        if (stmt == NULL)
668,101✔
607
                return NULL; /* OOM */
×
608

609
        vy_stmt_set_lsn(stmt, xrow->lsn);
668,101✔
610
        return stmt;
668,118✔
611
}
612

613
int
614
vy_stmt_snprint(char *buf, int size, const struct tuple *stmt)
2✔
615
{
616
        int total = 0;
2✔
617
        uint32_t mp_size;
618
        if (stmt == NULL) {
2✔
619
                SNPRINT(total, snprintf, buf, size, "<NULL>");
2✔
620
                return total;
×
621
        }
622
        SNPRINT(total, snprintf, buf, size, "%s(",
2✔
623
                iproto_type_name(vy_stmt_type(stmt)));
624
                SNPRINT(total, mp_snprint, buf, size, tuple_data(stmt));
2✔
625
        if (vy_stmt_type(stmt) == IPROTO_UPSERT) {
2✔
626
                SNPRINT(total, snprintf, buf, size, ", ops=");
×
627
                SNPRINT(total, mp_snprint, buf, size,
×
628
                        vy_stmt_upsert_ops(stmt, &mp_size));
629
        }
630
        SNPRINT(total, snprintf, buf, size, ", lsn=%lld)",
2✔
631
                (long long) vy_stmt_lsn(stmt));
632
        return total;
2✔
633
}
634

635
const char *
636
vy_stmt_str(const struct tuple *stmt)
2✔
637
{
638
        char *buf = tt_static_buf();
2✔
639
        if (vy_stmt_snprint(buf, TT_STATIC_BUF_LEN, stmt) < 0)
2✔
640
                return "<failed to format statement>";
×
641
        return buf;
2✔
642
}
643

644
struct tuple_format *
645
vy_tuple_format_new_with_colmask(struct tuple_format *mem_format)
1,870✔
646
{
647
        struct tuple_format *format = tuple_format_dup(mem_format);
1,870✔
648
        if (format == NULL)
1,870✔
649
                return NULL;
×
650
        /* + size of column mask. */
651
        assert(format->extra_size == 0);
1,870✔
652
        format->extra_size = sizeof(uint64_t);
1,870✔
653
        return format;
1,870✔
654
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc