• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 4275

pending completion
4275

push

travis-ci

GeorgyKirichenko
Merge remote-tracking branch 'origin/gh_1446_phia_test_v2' into 1.7

22313 of 27809 relevant lines covered (80.24%)

311402.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.71
/src/box/phia_space.cc
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "phia_engine.h"
32
#include "phia_index.h"
33
#include "phia_space.h"
34
#include "xrow.h"
35
#include "tuple.h"
36
#include "scoped_guard.h"
37
#include "txn.h"
38
#include "index.h"
39
#include "space.h"
40
#include "schema.h"
41
#include "port.h"
42
#include "request.h"
43
#include "iproto_constants.h"
44
#include "phia.h"
45
#include <stdlib.h>
46
#include <stdio.h>
47
#include <string.h>
48

49
PhiaSpace::PhiaSpace(Engine *e)
224✔
50
        :Handler(e)
224✔
51
{ }
224✔
52

53
void
54
PhiaSpace::applySnapshotRow(struct space *space, struct request *request)
125✔
55
{
56
        assert(request->type == IPROTO_INSERT);
57
        PhiaIndex *index = (PhiaIndex *)index_find(space, 0);
125✔
58

59
        space_validate_tuple_raw(space, request->tuple);
125✔
60
        int size = request->tuple_end - request->tuple;
125✔
61
        const char *key = tuple_field_raw(request->tuple, size,
62
                                          index->key_def->parts[0].fieldno);
125✔
63
        primary_key_validate(index->key_def, key, index->key_def->part_count);
125✔
64

65
        const char *value = NULL;
125✔
66
        struct phia_document *obj = index->createDocument(key, &value);
125✔
67
        size_t valuesize = size - (value - request->tuple);
125✔
68
        if (valuesize > 0)
125✔
69
                phia_document_set_field(obj, "value", value, valuesize);
125✔
70

71
        assert(request->header != NULL);
72

73
        struct phia_tx *tx = phia_begin(index->env);
125✔
74
        if (tx == NULL) {
125✔
75
                phia_document_delete(obj);
×
76
                phia_raise();
×
77
        }
78

79
        int64_t signature = request->header->lsn;
125✔
80
        phia_tx_set_lsn(tx, signature);
125✔
81

82
        if (phia_replace(tx, obj) != 0)
125✔
83
                phia_raise();
×
84
        /* obj destroyed by phia_replace() */
85

86
        int rc = phia_commit(tx);
125✔
87
        switch (rc) {
125✔
88
        case 0:
89
                return;
125✔
90
        case 1: /* rollback */
91
                return;
92
        case 2: /* lock */
93
                phia_rollback(tx);
×
94
                /* must never happen during JOIN */
95
                tnt_raise(ClientError, ER_TRANSACTION_CONFLICT);
×
96
                return;
97
        case -1:
98
                phia_raise();
×
99
                return;
100
        default:
101
                assert(0);
102
        }
103
}
104

105
struct tuple *
106
PhiaSpace::executeReplace(struct txn*,
12,535✔
107
                          struct space *space,
108
                          struct request *request)
109
{
110
        PhiaIndex *index = (PhiaIndex *)index_find(space, 0);
12,535✔
111

112
        space_validate_tuple_raw(space, request->tuple);
12,533✔
113

114
        int size = request->tuple_end - request->tuple;
12,531✔
115
        const char *key =
116
                tuple_field_raw(request->tuple, size,
117
                                index->key_def->parts[0].fieldno);
12,531✔
118
        primary_key_validate(index->key_def, key, index->key_def->part_count);
12,531✔
119

120
        /* unique constraint */
121
        if (request->type == IPROTO_INSERT) {
12,526✔
122
                enum dup_replace_mode mode = DUP_REPLACE_OR_INSERT;
11,266✔
123
                PhiaEngine *engine =
124
                        (PhiaEngine *)space->handler->engine;
11,266✔
125
                if (engine->recovery_complete)
11,266✔
126
                        mode = DUP_INSERT;
8,764✔
127
                if (mode == DUP_INSERT) {
11,266✔
128
                        struct tuple *found = index->findByKey(key, 0);
8,764✔
129
                        if (found) {
8,764✔
130
                                tuple_delete(found);
×
131
                                tnt_raise(ClientError, ER_TUPLE_FOUND,
×
132
                                                  index_name(index), space_name(space));
133
                        }
134
                }
135
        }
136

137
        /* replace */
138
        struct phia_tx *tx = (struct phia_tx *)(in_txn()->engine_tx);
12,526✔
139
        const char *value = NULL;
12,526✔
140
        struct phia_document *obj = index->createDocument(key, &value);
12,526✔
141
        size_t valuesize = size - (value - request->tuple);
12,526✔
142
        if (valuesize > 0)
12,526✔
143
                phia_document_set_field(obj, "value", value, valuesize);
6,203✔
144
        int rc;
145
        rc = phia_replace(tx, obj);
12,526✔
146
        if (rc == -1)
12,526✔
147
                phia_raise();
×
148

149
        return NULL;
12,526✔
150
}
151

152
struct tuple *
153
PhiaSpace::executeDelete(struct txn*, struct space *space,
7,266✔
154
                           struct request *request)
155
{
156
        PhiaIndex *index = (PhiaIndex *)index_find(space, request->index_id);
7,266✔
157
        const char *key = request->key;
7,266✔
158
        uint32_t part_count = mp_decode_array(&key);
7,266✔
159
        primary_key_validate(index->key_def, key, part_count);
7,266✔
160

161
        /* remove */
162
        struct phia_document *obj = index->createDocument(key, NULL);
7,263✔
163
        struct phia_tx *tx = (struct phia_tx *)(in_txn()->engine_tx);
7,263✔
164
        int rc = phia_delete(tx, obj);
7,263✔
165
        if (rc == -1)
7,263✔
166
                phia_raise();
×
167
        return NULL;
7,263✔
168
}
169

170
struct tuple *
171
PhiaSpace::executeUpdate(struct txn*, struct space *space,
5,527✔
172
                           struct request *request)
173
{
174
        /* Try to find the tuple by unique key */
175
        PhiaIndex *index = (PhiaIndex *)index_find(space, request->index_id);
5,527✔
176
        const char *key = request->key;
5,527✔
177
        uint32_t part_count = mp_decode_array(&key);
5,527✔
178
        primary_key_validate(index->key_def, key, part_count);
5,527✔
179
        struct tuple *old_tuple = index->findByKey(key, part_count);
5,524✔
180

181
        if (old_tuple == NULL)
5,524✔
182
                return NULL;
183

184
        /* Phia always yields a zero-ref tuple, GC it here. */
185
        TupleRef old_ref(old_tuple);
186

187
        /* Do tuple update */
188
        struct tuple *new_tuple =
189
                tuple_update(space->format,
190
                             region_aligned_alloc_xc_cb,
191
                             &fiber()->gc,
192
                             old_tuple, request->tuple,
193
                             request->tuple_end,
194
                             request->index_base);
5,520✔
195
        TupleRef ref(new_tuple);
5,520✔
196

197
        space_validate_tuple(space, new_tuple);
5,520✔
198
        space_check_update(space, old_tuple, new_tuple);
5,520✔
199

200
        /* replace */
201
        key = tuple_field_raw(new_tuple->data, new_tuple->bsize,
202
                              index->key_def->parts[0].fieldno);
5,520✔
203
        struct phia_tx *tx = (struct phia_tx *)(in_txn()->engine_tx);
5,520✔
204
        const char *value = NULL;
5,520✔
205
        struct phia_document *obj = index->createDocument(key, &value);
5,520✔
206
        size_t valuesize = new_tuple->bsize - (value - new_tuple->data);
5,520✔
207
        if (valuesize > 0)
5,520✔
208
                phia_document_set_field(obj, "value", value, valuesize);
5,520✔
209
        int rc;
210
        rc = phia_replace(tx, obj);
5,520✔
211
        if (rc == -1)
5,520✔
212
                phia_raise();
×
213
        return NULL;
5,520✔
214
}
215

216
static inline int
217
phia_upsert_prepare(char **src, uint32_t *src_size,
20,885✔
218
                      char **mp, uint32_t *mp_size, uint32_t *mp_size_key,
219
                      struct key_def *key_def)
220
{
221
        /* calculate msgpack size */
222
        uint32_t i = 0;
20,885✔
223
        *mp_size_key = 0;
20,885✔
224
        while (i < key_def->part_count) {
62,855✔
225
                if (key_def->parts[i].type == STRING)
21,085✔
226
                        *mp_size_key += mp_sizeof_str(src_size[i]);
230✔
227
                else
228
                        *mp_size_key += mp_sizeof_uint(load_u64(src[i]));
62,910✔
229
                i++;
21,085✔
230
        }
231

232
        /* count msgpack fields */
233
        uint32_t count = key_def->part_count;
20,885✔
234
        uint32_t value_field = key_def->part_count;
20,885✔
235
        uint32_t value_size = src_size[value_field];
20,885✔
236
        char *p = src[value_field];
20,885✔
237
        char *end = p + value_size;
20,885✔
238
        while (p < end) {
62,880✔
239
                count++;
21,110✔
240
                mp_next((const char **)&p);
21,110✔
241
        }
242

243
        /* allocate and encode tuple */
244
        *mp_size = mp_sizeof_array(count) + *mp_size_key + value_size;
20,885✔
245
        *mp = (char *)malloc(*mp_size);
20,885✔
246
        if (mp == NULL)
20,885✔
247
                return -1;
248
        p = *mp;
20,885✔
249
        p = mp_encode_array(p, count);
41,770✔
250
        i = 0;
20,885✔
251
        while (i < key_def->part_count) {
62,855✔
252
                if (key_def->parts[i].type == STRING)
21,085✔
253
                        p = mp_encode_str(p, src[i], src_size[i]);
115✔
254
                else
255
                        p = mp_encode_uint(p, load_u64(src[i]));
41,940✔
256
                i++;
21,085✔
257
        }
258
        memcpy(p, src[value_field], src_size[value_field]);
20,885✔
259
        return 0;
20,885✔
260
}
261

262
struct phia_mempool {
263
        void *chunks[128];
264
        int count;
265
};
266

267
static inline void
268
phia_mempool_init(phia_mempool *p)
269
{
270
        p->count = 0;
20,885✔
271
}
272

273
static inline void
274
phia_mempool_free(phia_mempool *p)
275
{
276
        int i = 0;
277
        while (i < p->count) {
167,110✔
278
                free(p->chunks[i]);
146,225✔
279
                i++;
146,225✔
280
        }
281
}
282

283
static void *
284
phia_update_alloc(void *arg, size_t size)
146,225✔
285
{
286
        /* simulate region allocator for use with
287
         * tuple_upsert_execute() */
288
        struct phia_mempool *p = (struct phia_mempool*)arg;
146,225✔
289
        assert(p->count < 128);
290
        void *ptr = malloc(size);
146,225✔
291
        p->chunks[p->count++] = ptr;
146,225✔
292
        return ptr;
146,225✔
293
}
294

295
static inline int
296
phia_upsert_do(char **result, uint32_t *result_size,
20,885✔
297
              char *tuple, uint32_t tuple_size, uint32_t tuple_size_key,
298
              char *upsert, int upsert_size)
299
{
300
        char *p = upsert;
20,885✔
301
        uint8_t index_base = *(uint8_t *)p;
20,885✔
302
        p += sizeof(uint8_t);
20,885✔
303
        uint32_t default_tuple_size = *(uint32_t *)p;
20,885✔
304
        p += sizeof(uint32_t);
20,885✔
305
        p += default_tuple_size;
20,885✔
306
        char *expr = p;
20,885✔
307
        char *expr_end = upsert + upsert_size;
20,885✔
308
        const char *up;
309
        uint32_t up_size;
310

311
        /* emit upsert */
312
        struct phia_mempool alloc;
313
        phia_mempool_init(&alloc);
20,885✔
314
        try {
315
                up = tuple_upsert_execute(phia_update_alloc, &alloc,
316
                                          expr,
317
                                          expr_end,
318
                                          tuple,
319
                                          tuple + tuple_size,
320
                                          &up_size, index_base);
20,885✔
321
        } catch (Exception *e) {
322
                phia_mempool_free(&alloc);
323
                return -1;
324
        }
325

326
        /* skip array size and key */
327
        const char *ptr = up;
20,885✔
328
        mp_decode_array(&ptr);
329
        ptr += tuple_size_key;
20,885✔
330

331
        /* get new value */
332
        *result_size = (uint32_t)((up + up_size) -  ptr);
20,885✔
333
        *result = (char *)malloc(*result_size);
20,885✔
334
        if (! *result) {
20,885✔
335
                phia_mempool_free(&alloc);
336
                return -1;
337
        }
338
        memcpy(*result, ptr, *result_size);
20,885✔
339
        phia_mempool_free(&alloc);
340
        return 0;
341
}
342

343
int
344
phia_upsert_cb(int count,
27,217✔
345
               char **src,    uint32_t *src_size,
346
               char **upsert, uint32_t *upsert_size,
347
               char **result, uint32_t *result_size,
348
               struct key_def *key_def)
349
{
350
        uint32_t value_field;
351
        value_field = key_def->part_count;
27,217✔
352

353
        /* use default tuple value */
354
        if (src == NULL)
27,217✔
355
        {
356
                /* result key fields are initialized to upsert
357
                 * fields by default */
358
                char *p = upsert[value_field];
6,332✔
359
                p += sizeof(uint8_t); /* index base */
6,332✔
360
                uint32_t value_size = *(uint32_t *)p;
6,332✔
361
                p += sizeof(uint32_t);
6,332✔
362
                void *value = (char *)malloc(value_size);
6,332✔
363
                if (value == NULL)
6,332✔
364
                        return -1;
365
                memcpy(value, p, value_size);
6,332✔
366
                result[value_field] = (char*)value;
6,332✔
367
                result_size[value_field] = value_size;
6,332✔
368
                return 0;
6,332✔
369
        }
370

371
        /* convert src to msgpack */
372
        char *tuple;
373
        uint32_t tuple_size_key;
374
        uint32_t tuple_size;
375
        int rc;
376
        rc = phia_upsert_prepare(src, src_size,
377
                                   &tuple, &tuple_size, &tuple_size_key,
378
                                   key_def);
20,885✔
379
        if (rc == -1)
20,885✔
380
                return -1;
381

382
        /* execute upsert */
383
        rc = phia_upsert_do(&result[value_field],
384
                           &result_size[value_field],
385
                           tuple, tuple_size, tuple_size_key,
386
                           upsert[value_field],
20,885✔
387
                           upsert_size[value_field]);
41,770✔
388
        free(tuple);
20,885✔
389

390
        (void)count;
391
        (void)upsert_size;
392
        return rc;
20,885✔
393
}
394

395
void
396
PhiaSpace::executeUpsert(struct txn*, struct space *space,
11,461✔
397
                           struct request *request)
398
{
399
        PhiaIndex *index = (PhiaIndex *)index_find(space, request->index_id);
11,461✔
400

401
        /* Check field count in tuple */
402
        space_validate_tuple_raw(space, request->tuple);
11,461✔
403

404
        /* Check tuple fields */
405
        tuple_validate_raw(space->format, request->tuple);
11,460✔
406

407
        const char *expr      = request->ops;
11,457✔
408
        const char *expr_end  = request->ops_end;
11,457✔
409
        const char *tuple     = request->tuple;
11,457✔
410
        const char *tuple_end = request->tuple_end;
11,457✔
411
        uint8_t index_base    = request->index_base;
11,457✔
412

413
        /* upsert */
414
        mp_decode_array(&tuple);
415
        uint32_t expr_size  = expr_end - expr;
11,457✔
416
        uint32_t tuple_size = tuple_end - tuple;
11,457✔
417
        uint32_t tuple_value_size;
418
        const char *tuple_value;
419
        struct phia_document *obj = index->createDocument(tuple, &tuple_value);
11,457✔
420
        tuple_value_size = tuple_size - (tuple_value - tuple);
11,457✔
421
        uint32_t value_size =
422
                sizeof(uint8_t) + sizeof(uint32_t) + tuple_value_size + expr_size;
11,457✔
423
        char *value = (char *)malloc(value_size);
11,457✔
424
        if (value == NULL) {
11,457✔
425
                phia_document_delete(obj);
×
426
                tnt_raise(OutOfMemory, sizeof(value_size), "Phia Space",
×
427
                          "executeUpsert");
428
        }
429
        char *p = value;
11,457✔
430
        memcpy(p, &index_base, sizeof(uint8_t));
431
        p += sizeof(uint8_t);
11,457✔
432
        memcpy(p, &tuple_value_size, sizeof(uint32_t));
433
        p += sizeof(uint32_t);
11,457✔
434
        memcpy(p, tuple_value, tuple_value_size);
11,457✔
435
        p += tuple_value_size;
11,457✔
436
        memcpy(p, expr, expr_size);
11,457✔
437
        phia_document_set_field(obj, "value", value, value_size);
11,457✔
438
        struct phia_tx *tx = (struct phia_tx *)(in_txn()->engine_tx);
11,457✔
439
        int rc = phia_upsert(tx, obj);
11,457✔
440
        free(value);
11,457✔
441
        if (rc == -1)
11,457✔
442
                phia_raise();
×
443
}
11,457✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc