• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 12184

pending completion
12184

push

travis-ci

Gerold103
collation: refactoring

Simplify collation code.

17 of 17 new or added lines in 2 files covered. (100.0%)

38680 of 47406 relevant lines covered (81.59%)

897251.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.82
/src/box/alter.cc
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "alter.h"
32
#include "schema.h"
33
#include "user.h"
34
#include "space.h"
35
#include "index.h"
36
#include "func.h"
37
#include "coll_id_cache.h"
38
#include "coll_id_def.h"
39
#include "txn.h"
40
#include "tuple.h"
41
#include "fiber.h" /* for gc_pool */
42
#include "scoped_guard.h"
43
#include "third_party/base64.h"
44
#include <new> /* for placement new */
45
#include <stdio.h> /* snprintf() */
46
#include <ctype.h>
47
#include "replication.h" /* for replica_set_id() */
48
#include "session.h" /* to fetch the current user. */
49
#include "vclock.h" /* VCLOCK_MAX */
50
#include "xrow.h"
51
#include "iproto_constants.h"
52
#include "identifier.h"
53
#include "memtx_tuple.h"
54
#include "version.h"
55
#include "sequence.h"
56

57
/**
58
 * chap-sha1 of empty string, i.e.
59
 * base64_encode(sha1(sha1(""), 0)
60
 */
61
#define CHAP_SHA1_EMPTY_PASSWORD "vhvewKp0tNyweZQ+cFKAlsyphfg="
62

63
/* {{{ Auxiliary functions and methods. */
64

65
static void
66
access_check_ddl(const char *name, uint32_t owner_uid,
330,496✔
67
                 enum schema_object_type type,
68
                 enum priv_type priv_type,
69
                 bool is_17_compat_mode)
70
{
71
        struct credentials *cr = effective_user();
330,496✔
72
        user_access_t has_access = cr->universal_access;
330,496✔
73
        /*
74
         * XXX: pre 1.7.7 there was no specific 'CREATE' or
75
         * 'ALTER' ACL, instead, read and write access on universe
76
         * was used to allow create/alter.
77
         * For backward compatibility, if a user has read and write
78
         * access on the universe, grant it CREATE access
79
         * automatically.
80
         * The legacy fix does not affect sequences since they
81
         * were added in 1.7.7 only.
82
         */
83
        if (is_17_compat_mode && has_access & PRIV_R && has_access & PRIV_W)
330,496✔
84
                has_access |= PRIV_C | PRIV_A;
317,264✔
85

86
        user_access_t access = ((PRIV_U | (user_access_t) priv_type) &
330,496✔
87
                                ~has_access);
330,496✔
88
        bool is_owner = owner_uid == cr->uid || cr->uid == ADMIN;
330,496✔
89
        /*
90
         * Only the owner of the object or someone who has
91
         * specific DDL privilege on the object can execute
92
         * DDL. If a user has no USAGE access and is owner,
93
         * deny access as well.
94
         */
95
        if (access == 0 || (is_owner && !(access & PRIV_U)))
330,496✔
96
                return; /* Access granted. */
660,962✔
97

98
        struct user *user = user_find_xc(cr->uid);
15✔
99
        if (is_owner) {
15✔
100
                tnt_raise(AccessDeniedError,
×
101
                          priv_name(PRIV_U),
102
                          schema_object_name(SC_UNIVERSE),
103
                          "",
104
                          user->def->name);
105
        } else {
106
                tnt_raise(AccessDeniedError,
15✔
107
                          priv_name(access),
108
                          schema_object_name(type),
109
                          name,
110
                          user->def->name);
111
        }
112
}
113

114
/**
115
 * Throw an exception if the given index definition
116
 * is incompatible with a sequence.
117
 */
118
static void
119
index_def_check_sequence(struct index_def *index_def, const char *space_name)
36✔
120
{
121
        enum field_type type = index_def->key_def->parts[0].type;
36✔
122
        if (type != FIELD_TYPE_UNSIGNED && type != FIELD_TYPE_INTEGER) {
36✔
123
                tnt_raise(ClientError, ER_MODIFY_INDEX, index_def->name,
2✔
124
                          space_name, "sequence cannot be used with "
125
                          "a non-integer key");
126
        }
127
}
34✔
128

129
/**
130
 * Support function for index_def_new_from_tuple(..)
131
 * Checks tuple (of _index space) and throws a nice error if it is invalid
132
 * Checks only types of fields and their count!
133
 * Additionally determines version of tuple structure
134
 * is_166plus is set as true if tuple structure is 1.6.6+
135
 * is_166plus is set as false if tuple structure is 1.6.5-
136
 */
137
static void
138
index_def_check_tuple(const struct tuple *tuple, bool *is_166plus)
31,076✔
139
{
140
        *is_166plus = true;
31,076✔
141
        const mp_type common_template[] = {MP_UINT, MP_UINT, MP_STR, MP_STR};
31,076✔
142
        const char *data = tuple_data(tuple);
31,076✔
143
        uint32_t field_count = mp_decode_array(&data);
31,076✔
144
        const char *field_start = data;
31,076✔
145
        if (field_count < 6)
31,076✔
146
                goto err;
×
147
        for (size_t i = 0; i < lengthof(common_template); i++) {
155,380✔
148
                enum mp_type type = mp_typeof(*data);
248,608✔
149
                if (type != common_template[i])
124,304✔
150
                        goto err;
×
151
                mp_next(&data);
124,304✔
152
        }
153
        if (mp_typeof(*data) == MP_UINT) {
62,152✔
154
                /* old 1.6.5- version */
155
                /* TODO: removed it in newer versions, find all 1.6.5- */
156
                *is_166plus = false;
41✔
157
                mp_next(&data);
41✔
158
                if (mp_typeof(*data) != MP_UINT)
82✔
159
                        goto err;
×
160
                if (field_count % 2)
41✔
161
                        goto err;
1✔
162
                mp_next(&data);
40✔
163
                for (uint32_t i = 6; i < field_count; i += 2) {
92✔
164
                        if (mp_typeof(*data) != MP_UINT)
104✔
165
                                goto err;
×
166
                        mp_next(&data);
52✔
167
                        if (mp_typeof(*data) != MP_STR)
104✔
168
                                goto err;
×
169
                        mp_next(&data);
52✔
170
                }
171
        } else {
172
                if (field_count != 6)
31,035✔
173
                        goto err;
×
174
                if (mp_typeof(*data) != MP_MAP)
62,070✔
175
                        goto err;
×
176
                mp_next(&data);
31,035✔
177
                if (mp_typeof(*data) != MP_ARRAY)
62,070✔
178
                        goto err;
×
179
        }
180
        return;
62,150✔
181

182
err:
183
        char got[DIAG_ERRMSG_MAX];
184
        char *p = got, *e = got + sizeof(got);
1✔
185
        data = field_start;
1✔
186
        for (uint32_t i = 0; i < field_count && p < e; i++) {
10✔
187
                enum mp_type type = mp_typeof(*data);
18✔
188
                mp_next(&data);
9✔
189
                const char *type_name;
190
                switch (type) {
9✔
191
                case MP_UINT:
192
                        type_name = "number";
5✔
193
                        break;
5✔
194
                case MP_STR:
195
                        type_name = "string";
4✔
196
                        break;
4✔
197
                case MP_ARRAY:
198
                        type_name = "array";
×
199
                        break;
×
200
                case MP_MAP:
201
                        type_name = "map";
×
202
                        break;
×
203
                default:
204
                        type_name = "unknown";
×
205
                        break;
×
206
                }
207
                p += snprintf(p, e - p, i ? ", %s" : "%s", type_name);
9✔
208
        }
209
        const char *expected;
210
        if (*is_166plus) {
1✔
211
                expected = "space id (number), index id (number), "
×
212
                        "name (string), type (string), "
213
                        "options (map), parts (array)";
214
        } else {
215
                expected = "space id (number), index id (number), "
1✔
216
                        "name (string), type (string), "
217
                        "is_unique (number), part count (number) "
218
                        "part0 field no (number), "
219
                        "part0 field type (string), ...";
220
        }
221
        tnt_raise(ClientError, ER_WRONG_INDEX_RECORD, got, expected);
1✔
222
}
223

224
/**
225
 * Fill index_opts structure from opts field in tuple of space _index
226
 * Throw an error is unrecognized option.
227
 */
228
static void
229
index_opts_decode(struct index_opts *opts, const char *map)
31,035✔
230
{
231
        index_opts_create(opts);
31,035✔
232
        if (opts_decode(opts, index_opts_reg, &map, ER_WRONG_INDEX_OPTIONS,
31,035✔
233
                        BOX_INDEX_FIELD_OPTS, NULL) != 0)
234
                diag_raise();
2✔
235
        if (opts->distance == rtree_index_distance_type_MAX) {
31,033✔
236
                tnt_raise(ClientError, ER_WRONG_INDEX_OPTIONS,
1✔
237
                          BOX_INDEX_FIELD_OPTS, "distance must be either "\
238
                          "'euclid' or 'manhattan'");
239
        }
240
        if (opts->range_size <= 0) {
31,032✔
241
                tnt_raise(ClientError, ER_WRONG_INDEX_OPTIONS,
1✔
242
                          BOX_INDEX_FIELD_OPTS,
243
                          "range_size must be greater than 0");
244
        }
245
        if (opts->page_size <= 0 || opts->page_size > opts->range_size) {
31,031✔
246
                tnt_raise(ClientError, ER_WRONG_INDEX_OPTIONS,
2✔
247
                          BOX_INDEX_FIELD_OPTS,
248
                          "page_size must be greater than 0 and "
249
                          "less than or equal to range_size");
250
        }
251
        if (opts->run_count_per_level <= 0) {
31,029✔
252
                tnt_raise(ClientError, ER_WRONG_INDEX_OPTIONS,
1✔
253
                          BOX_INDEX_FIELD_OPTS,
254
                          "run_count_per_level must be greater than 0");
255
        }
256
        if (opts->run_size_ratio <= 1) {
31,028✔
257
                tnt_raise(ClientError, ER_WRONG_INDEX_OPTIONS,
1✔
258
                          BOX_INDEX_FIELD_OPTS,
259
                          "run_size_ratio must be greater than 1");
260
        }
261
        if (opts->bloom_fpr <= 0 || opts->bloom_fpr > 1) {
31,027✔
262
                tnt_raise(ClientError, ER_WRONG_INDEX_OPTIONS,
2✔
263
                          BOX_INDEX_FIELD_OPTS,
264
                          "bloom_fpr must be greater than 0 and "
265
                          "less than or equal to 1");
266
        }
267
}
31,025✔
268

269
/**
270
 * Create a index_def object from a record in _index
271
 * system space.
272
 *
273
 * Check that:
274
 * - index id is within range
275
 * - index type is supported
276
 * - part count > 0
277
 * - there are parts for the specified part count
278
 * - types of parts in the parts array are known to the system
279
 * - fieldno of each part in the parts array is within limits
280
 */
281
static struct index_def *
282
index_def_new_from_tuple(struct tuple *tuple, struct space *space)
31,076✔
283
{
284
        bool is_166plus;
285
        index_def_check_tuple(tuple, &is_166plus);
31,076✔
286

287
        struct index_opts opts;
288
        index_opts_create(&opts);
31,075✔
289
        uint32_t id = tuple_field_u32_xc(tuple, BOX_INDEX_FIELD_SPACE_ID);
31,075✔
290
        uint32_t index_id = tuple_field_u32_xc(tuple, BOX_INDEX_FIELD_ID);
31,075✔
291
        enum index_type type =
292
                STR2ENUM(index_type, tuple_field_cstr_xc(tuple,
31,075✔
293
                                                         BOX_INDEX_FIELD_TYPE));
294
        uint32_t name_len;
295
        const char *name = tuple_field_str_xc(tuple, BOX_INDEX_FIELD_NAME,
296
                                              &name_len);
31,075✔
297
        uint32_t part_count;
298
        const char *parts;
299
        if (is_166plus) {
31,075✔
300
                /* 1.6.6+ _index space structure */
301
                const char *opts_field =
302
                        tuple_field_with_type_xc(tuple, BOX_INDEX_FIELD_OPTS,
303
                                                 MP_MAP);
31,035✔
304
                index_opts_decode(&opts, opts_field);
31,035✔
305
                parts = tuple_field(tuple, BOX_INDEX_FIELD_PARTS);
31,025✔
306
                part_count = mp_decode_array(&parts);
31,025✔
307
        } else {
308
                /* 1.6.5- _index space structure */
309
                /* TODO: remove it in newer versions, find all 1.6.5- */
310
                opts.is_unique =
40✔
311
                        tuple_field_u32_xc(tuple,
40✔
312
                                           BOX_INDEX_FIELD_IS_UNIQUE_165);
40✔
313
                part_count = tuple_field_u32_xc(tuple,
314
                                                BOX_INDEX_FIELD_PART_COUNT_165);
40✔
315
                parts = tuple_field(tuple, BOX_INDEX_FIELD_PARTS_165);
40✔
316
        }
317
        if (name_len > BOX_NAME_MAX) {
31,065✔
318
                tnt_raise(ClientError, ER_MODIFY_INDEX,
2✔
319
                          tt_cstr(name, BOX_INVALID_NAME_MAX),
320
                          space_name(space), "index name is too long");
321
        }
322
        identifier_check_xc(name, name_len);
31,063✔
323
        struct key_def *key_def = NULL;
31,047✔
324
        struct key_part_def *part_def = (struct key_part_def *)
325
                        malloc(sizeof(*part_def) * part_count);
31,047✔
326
        if (part_def == NULL) {
31,047✔
327
                tnt_raise(OutOfMemory, sizeof(*part_def) * part_count,
×
328
                          "malloc", "key_part_def");
329
        }
330
        auto key_def_guard = make_scoped_guard([&] {
31,047✔
331
                free(part_def);
31,047✔
332
                if (key_def != NULL)
31,047✔
333
                        key_def_delete(key_def);
31,033✔
334
        });
93,141✔
335
        if (is_166plus) {
31,047✔
336
                /* 1.6.6+ */
337
                if (key_def_decode_parts(part_def, part_count, &parts,
62,014✔
338
                                         space->def->fields,
31,007✔
339
                                         space->def->field_count) != 0)
31,007✔
340
                        diag_raise();
13✔
341
        } else {
342
                /* 1.6.5- TODO: remove it in newer versions, find all 1.6.5- */
343
                if (key_def_decode_parts_160(part_def, part_count, &parts,
80✔
344
                                             space->def->fields,
40✔
345
                                             space->def->field_count) != 0)
40✔
346
                        diag_raise();
1✔
347
        }
348
        key_def = key_def_new_with_parts(part_def, part_count);
31,033✔
349
        if (key_def == NULL)
31,033✔
350
                diag_raise();
×
351
        struct index_def *index_def =
352
                index_def_new(id, index_id, name, name_len, type,
31,033✔
353
                              &opts, key_def, space_index_key_def(space, 0));
31,033✔
354
        if (index_def == NULL)
31,033✔
355
                diag_raise();
×
356
        auto index_def_guard = make_scoped_guard([=] { index_def_delete(index_def); });
62,123✔
357
        index_def_check_xc(index_def, space_name(space));
31,033✔
358
        space_check_index_def_xc(space, index_def);
31,019✔
359
        if (index_def->iid == 0 && space->sequence != NULL)
30,977✔
360
                index_def_check_sequence(index_def, space_name(space));
5✔
361
        index_def_guard.is_active = false;
30,976✔
362
        return index_def;
61,952✔
363
}
364

365
/**
366
 * Fill space opts from the msgpack stream (MP_MAP field in the
367
 * tuple).
368
 */
369
static void
370
space_opts_decode(struct space_opts *opts, const char *data)
81,978✔
371
{
372
        space_opts_create(opts);
81,978✔
373
        if (data == NULL)
81,978✔
374
                return;
1✔
375

376
        bool is_170_plus = (mp_typeof(*data) == MP_MAP);
163,954✔
377
        if (!is_170_plus) {
81,977✔
378
                /* Tarantool < 1.7.0 compatibility */
379
                if (mp_typeof(*data) != MP_STR) {
108✔
380
                        tnt_raise(ClientError, ER_FIELD_TYPE,
×
381
                                  BOX_SPACE_FIELD_OPTS + TUPLE_INDEX_BASE,
382
                                  mp_type_strs[MP_STR]);
383
                }
384
                uint32_t len;
385
                const char *flags = mp_decode_str(&data, &len);
54✔
386
                flags = tt_cstr(flags, len);
54✔
387
                while (flags && *flags) {
86✔
388
                        while (isspace(*flags)) /* skip space */
16✔
389
                                flags++;
×
390
                        if (strncmp(flags, "temporary", strlen("temporary")) == 0)
16✔
391
                                opts->temporary = true;
6✔
392
                        flags = strchr(flags, ',');
16✔
393
                        if (flags)
16✔
394
                                flags++;
5✔
395
                }
396
        } else if (opts_decode(opts, space_opts_reg, &data,
81,923✔
397
                               ER_WRONG_SPACE_OPTIONS,
398
                               BOX_SPACE_FIELD_OPTS, NULL) != 0) {
399
                diag_raise();
×
400
        }
401
}
402

403
/**
404
 * Decode field definition from MessagePack map. Format:
405
 * {name: <string>, type: <string>}. Type is optional.
406
 * @param[out] field Field to decode to.
407
 * @param data MessagePack map to decode.
408
 * @param space_name Name of a space, from which the field is got.
409
 *        Used in error messages.
410
 * @param name_len Length of @a space_name.
411
 * @param errcode Error code to use for client errors. Either
412
 *        create or modify space errors.
413
 * @param fieldno Field number to decode. Used in error messages.
414
 * @param region Region to allocate field name.
415
 */
416
static void
417
field_def_decode(struct field_def *field, const char **data,
55,642✔
418
                 const char *space_name, uint32_t name_len,
419
                 uint32_t errcode, uint32_t fieldno, struct region *region)
420
{
421
        if (mp_typeof(**data) != MP_MAP) {
111,284✔
422
                tnt_raise(ClientError, errcode, tt_cstr(space_name, name_len),
×
423
                          tt_sprintf("field %d is not map",
424
                                     fieldno + TUPLE_INDEX_BASE));
425
        }
426
        int count = mp_decode_map(data);
55,642✔
427
        *field = field_def_default;
55,642✔
428
        for (int i = 0; i < count; ++i) {
166,965✔
429
                if (mp_typeof(**data) != MP_STR) {
222,652✔
430
                        tnt_raise(ClientError, errcode,
1✔
431
                                  tt_cstr(space_name, name_len),
432
                                  tt_sprintf("field %d format is not map"\
433
                                             " with string keys",
434
                                             fieldno + TUPLE_INDEX_BASE));
435
                }
436
                uint32_t key_len;
437
                const char *key = mp_decode_str(data, &key_len);
111,325✔
438
                if (opts_parse_key(field, field_def_reg, key, key_len, data,
111,325✔
439
                                   ER_WRONG_SPACE_FORMAT,
440
                                   fieldno + TUPLE_INDEX_BASE, region,
441
                                   true) != 0)
442
                        diag_raise();
2✔
443
        }
444
        if (field->name == NULL) {
55,639✔
445
                tnt_raise(ClientError, errcode, tt_cstr(space_name, name_len),
1✔
446
                          tt_sprintf("field %d name is not specified",
447
                                     fieldno + TUPLE_INDEX_BASE));
448
        }
449
        size_t field_name_len = strlen(field->name);
55,638✔
450
        if (field_name_len > BOX_NAME_MAX) {
55,638✔
451
                tnt_raise(ClientError, errcode, tt_cstr(space_name, name_len),
3✔
452
                          tt_sprintf("field %d name is too long",
453
                                     fieldno + TUPLE_INDEX_BASE));
454
        }
455
        identifier_check_xc(field->name, field_name_len);
55,635✔
456
        if (field->type == field_type_MAX) {
55,621✔
457
                tnt_raise(ClientError, errcode, tt_cstr(space_name, name_len),
2✔
458
                          tt_sprintf("field %d has unknown field type",
459
                                     fieldno + TUPLE_INDEX_BASE));
460
        }
461
}
55,619✔
462

463
/**
464
 * Decode MessagePack array of fields.
465
 * @param data MessagePack array of fields.
466
 * @param[out] out_count Length of a result array.
467
 * @param space_name Space name to use in error messages.
468
 * @param errcode Errcode for client errors.
469
 * @param region Region to allocate result array.
470
 *
471
 * @retval Array of fields.
472
 */
473
static struct field_def *
474
space_format_decode(const char *data, uint32_t *out_count,
81,669✔
475
                    const char *space_name, uint32_t name_len,
476
                    uint32_t errcode, struct region *region)
477
{
478
        /* Type is checked by _space format. */
479
        assert(mp_typeof(*data) == MP_ARRAY);
163,338✔
480
        uint32_t count = mp_decode_array(&data);
81,669✔
481
        *out_count = count;
81,669✔
482
        if (count == 0)
81,669✔
483
                return NULL;
69,962✔
484
        size_t size = count * sizeof(struct field_def);
11,707✔
485
        struct field_def *region_defs =
486
                (struct field_def *) region_alloc_xc(region, size);
11,707✔
487
        for (uint32_t i = 0; i < count; ++i) {
67,326✔
488
                field_def_decode(&region_defs[i], &data, space_name, name_len,
55,642✔
489
                                 errcode, i, region);
55,642✔
490
        }
491
        return region_defs;
11,684✔
492
}
493

494
/**
495
 * Fill space_def structure from struct tuple.
496
 */
497
static struct space_def *
498
space_def_new_from_tuple(struct tuple *tuple, uint32_t errcode,
82,023✔
499
                         struct region *region)
500
{
501
        uint32_t name_len;
502
        const char *name =
503
                tuple_field_str_xc(tuple, BOX_SPACE_FIELD_NAME, &name_len);
82,023✔
504
        if (name_len > BOX_NAME_MAX)
82,023✔
505
                tnt_raise(ClientError, errcode,
4✔
506
                          tt_cstr(name, BOX_INVALID_NAME_MAX),
507
                          "space name is too long");
508
        identifier_check_xc(name, name_len);
82,019✔
509
        uint32_t id = tuple_field_u32_xc(tuple, BOX_SPACE_FIELD_ID);
82,003✔
510
        if (id > BOX_SPACE_MAX) {
82,003✔
511
                tnt_raise(ClientError, errcode, tt_cstr(name, name_len),
×
512
                          "space id is too big");
513
        }
514
        if (id == 0) {
82,003✔
515
                tnt_raise(ClientError, errcode, tt_cstr(name, name_len),
1✔
516
                          "space id 0 is reserved");
517
        }
518
        uint32_t uid = tuple_field_u32_xc(tuple, BOX_SPACE_FIELD_UID);
82,002✔
519
        uint32_t exact_field_count =
520
                tuple_field_u32_xc(tuple, BOX_SPACE_FIELD_FIELD_COUNT);
82,002✔
521
        uint32_t engine_name_len;
522
        const char *engine_name =
523
                tuple_field_str_xc(tuple, BOX_SPACE_FIELD_ENGINE,
524
                                   &engine_name_len);
82,002✔
525
        /*
526
         * Engines are compiled-in so their names are known in
527
         * advance to be shorter than names of other identifiers.
528
         */
529
        if (engine_name_len > ENGINE_NAME_MAX) {
82,002✔
530
                tnt_raise(ClientError, errcode, tt_cstr(name, name_len),
1✔
531
                          "space engine name is too long");
532
        }
533
        identifier_check_xc(engine_name, engine_name_len);
82,001✔
534
        const char *space_opts;
535
        struct field_def *fields;
536
        uint32_t field_count;
537
        if (dd_version_id >= version_id(1, 7, 6)) {
82,001✔
538
                /* Check space opts. */
539
                space_opts =
540
                        tuple_field_with_type_xc(tuple, BOX_SPACE_FIELD_OPTS,
541
                                                 MP_MAP);
81,669✔
542
                /* Check space format */
543
                const char *format =
544
                        tuple_field_with_type_xc(tuple, BOX_SPACE_FIELD_FORMAT,
545
                                                 MP_ARRAY);
81,669✔
546
                fields = space_format_decode(format, &field_count, name,
81,669✔
547
                                             name_len, errcode, region);
81,669✔
548
                if (exact_field_count != 0 &&
81,665✔
549
                    exact_field_count < field_count) {
19✔
550
                        tnt_raise(ClientError, errcode, tt_cstr(name, name_len),
×
551
                                  "exact_field_count must be either 0 or >= "\
552
                                  "formatted field count");
553
                }
554
        } else {
555
                fields = NULL;
332✔
556
                field_count = 0;
332✔
557
                space_opts = tuple_field(tuple, BOX_SPACE_FIELD_OPTS);
332✔
558
        }
559
        struct space_opts opts;
560
        space_opts_decode(&opts, space_opts);
81,978✔
561
        struct space_def *def =
562
                space_def_new_xc(id, uid, exact_field_count, name, name_len,
81,978✔
563
                                 engine_name, engine_name_len, &opts, fields,
564
                                 field_count);
81,978✔
565
        auto def_guard = make_scoped_guard([=] { space_def_delete(def); });
163,954✔
566
        struct engine *engine = engine_find_xc(def->engine_name);
81,976✔
567
        engine_check_space_def_xc(engine, def);
81,975✔
568
        def_guard.is_active = false;
81,974✔
569
        return def;
163,948✔
570
}
571

572
/**
573
 * Space old and new space triggers (move the original triggers
574
 * to the new space, or vice versa, restore the original triggers
575
 * in the old space).
576
 */
577
static void
578
space_swap_triggers(struct space *new_space, struct space *old_space)
44,406✔
579
{
580
        rlist_swap(&new_space->before_replace, &old_space->before_replace);
44,406✔
581
        rlist_swap(&new_space->on_replace, &old_space->on_replace);
44,406✔
582
        rlist_swap(&new_space->on_stmt_begin, &old_space->on_stmt_begin);
44,406✔
583
}
44,406✔
584

585
/**
586
 * True if the space has records identified by key 'uid'.
587
 * Uses 'iid' index.
588
 */
589
bool
590
space_has_data(uint32_t id, uint32_t iid, uint32_t uid)
69,905✔
591
{
592
        struct space *space = space_by_id(id);
69,905✔
593
        if (space == NULL)
69,905✔
594
                return false;
×
595

596
        if (space_index(space, iid) == NULL)
69,905✔
597
                return false;
×
598

599
        struct index *index = index_find_system_xc(space, iid);
69,905✔
600
        char key[6];
601
        assert(mp_sizeof_uint(BOX_SYSTEM_ID_MIN) <= sizeof(key));
69,905✔
602
        mp_encode_uint(key, uid);
69,905✔
603
        struct iterator *it = index_create_iterator_xc(index, ITER_EQ, key, 1);
69,905✔
604
        IteratorGuard iter_guard(it);
139,810✔
605
        if (iterator_next_xc(it) != NULL)
69,905✔
606
                return true;
9✔
607
        return false;
69,896✔
608
}
609

610
/* }}} */
611

612
/* {{{ struct alter_space - the body of a full blown alter */
613
struct alter_space;
614

615
class AlterSpaceOp {
616
public:
617
        AlterSpaceOp(struct alter_space *alter);
618

619
        /** Link in alter_space::ops. */
620
        struct rlist link;
621
        /**
622
         * Called before creating the new space. Used to update
623
         * the space definition and/or key list that will be used
624
         * for creating the new space. Must not yield or fail.
625
         */
626
        virtual void alter_def(struct alter_space * /* alter */) {}
59,393✔
627
        /**
628
         * Called after creating a new space. Used for performing
629
         * long-lasting operations, such as index rebuild or format
630
         * check. May yield. May throw an exception. Must not modify
631
         * the old space.
632
         */
633
        virtual void prepare(struct alter_space * /* alter */) {}
62,569✔
634
        /**
635
         * Called after all registered operations have completed
636
         * the preparation phase. Used to propagate the old space
637
         * state to the new space (e.g. move unchanged indexes).
638
         * Must not yield or fail.
639
         */
640
        virtual void alter(struct alter_space * /* alter */) {}
39,438✔
641
        /**
642
         * Called after the change has been successfully written
643
         * to WAL. Must not fail.
644
         */
645
        virtual void commit(struct alter_space * /* alter */,
64,192✔
646
                            int64_t /* signature */) {}
64,192✔
647
        /**
648
         * Called in case a WAL error occurred. It is supposed to undo
649
         * the effect of AlterSpaceOp::prepare and AlterSpaceOp::alter.
650
         * Must not fail.
651
         */
652
        virtual void rollback(struct alter_space * /* alter */) {}
7✔
653

654
        virtual ~AlterSpaceOp() {}
102,333✔
655

656
        void *operator new(size_t size)
102,333✔
657
        {
658
                return region_aligned_calloc_xc(&fiber()->gc, size,
102,333✔
659
                                                alignof(uint64_t));
102,333✔
660
        }
661
        void operator delete(void * /* ptr */) {}
102,333✔
662
};
663

664
/**
665
 * A trigger installed on transaction commit/rollback events of
666
 * the transaction which initiated the alter.
667
 */
668
static struct trigger *
669
txn_alter_trigger_new(trigger_f run, void *data)
1,036,474✔
670
{
671
        struct trigger *trigger = (struct trigger *)
672
                region_calloc_object_xc(&fiber()->gc, struct trigger);
1,036,474✔
673
        trigger->run = run;
1,036,474✔
674
        trigger->data = data;
1,036,474✔
675
        trigger->destroy = NULL;
1,036,474✔
676
        return trigger;
1,036,474✔
677
}
678

679
struct alter_space {
680
        /** List of alter operations */
681
        struct rlist ops;
682
        /** Definition of the new space - space_def. */
683
        struct space_def *space_def;
684
        /** Definition of the new space - keys. */
685
        struct rlist key_list;
686
        /** Old space. */
687
        struct space *old_space;
688
        /** New space. */
689
        struct space *new_space;
690
        /**
691
         * Assigned to the new primary key definition if we're
692
         * rebuilding the primary key, i.e. changing its key parts
693
         * substantially.
694
         */
695
        struct key_def *pk_def;
696
        /**
697
         * Min field count of a new space. It is calculated before
698
         * the new space is created and used to update optionality
699
         * of key_defs and key_parts.
700
         */
701
        uint32_t new_min_field_count;
702
};
703

704
static struct alter_space *
705
alter_space_new(struct space *old_space)
44,580✔
706
{
707
        struct alter_space *alter =
708
                region_calloc_object_xc(&fiber()->gc, struct alter_space);
44,580✔
709
        rlist_create(&alter->ops);
44,580✔
710
        alter->old_space = old_space;
44,580✔
711
        alter->space_def = space_def_dup_xc(alter->old_space->def);
44,580✔
712
        if (old_space->format != NULL)
44,580✔
713
                alter->new_min_field_count = old_space->format->min_field_count;
34,173✔
714
        else
715
                alter->new_min_field_count = 0;
10,407✔
716
        return alter;
44,580✔
717
}
718

719
/** Destroy alter. */
720
static void
721
alter_space_delete(struct alter_space *alter)
146,913✔
722
{
723
        /* Destroy the ops. */
724
        while (! rlist_empty(&alter->ops)) {
249,246✔
725
                AlterSpaceOp *op = rlist_shift_entry(&alter->ops,
102,333✔
726
                                                     AlterSpaceOp, link);
727
                delete op;
102,333✔
728
        }
729
        /* Delete the new space, if any. */
730
        if (alter->new_space)
44,580✔
731
                space_delete(alter->new_space);
71✔
732
        space_def_delete(alter->space_def);
44,580✔
733
}
44,580✔
734

735
AlterSpaceOp::AlterSpaceOp(struct alter_space *alter)
102,333✔
736
{
737
        /* Add to the tail: operations must be processed in order. */
738
        rlist_add_tail_entry(&alter->ops, this, link);
102,333✔
739
}
102,333✔
740

741
/**
742
 * Commit the alter.
743
 *
744
 * Move all unchanged indexes from the old space to the new space.
745
 * Set the newly built indexes in the new space, or free memory
746
 * of the dropped indexes.
747
 * Replace the old space with a new one in the space cache.
748
 */
749
static void
750
alter_space_commit(struct trigger *trigger, void *event)
44,392✔
751
{
752
        struct txn *txn = (struct txn *) event;
44,392✔
753
        struct alter_space *alter = (struct alter_space *) trigger->data;
44,392✔
754
        /*
755
         * Commit alter ops, this will move the changed
756
         * indexes into their new places.
757
         */
758
        class AlterSpaceOp *op;
759
        rlist_foreach_entry(op, &alter->ops, link) {
146,368✔
760
                op->commit(alter, txn->signature);
101,976✔
761
        }
762

763
        trigger_run_xc(&on_alter_space, alter->new_space);
44,392✔
764

765
        alter->new_space = NULL; /* for alter_space_delete(). */
44,392✔
766
        /*
767
         * Delete the old version of the space, we are not
768
         * going to use it.
769
         */
770
        space_delete(alter->old_space);
44,392✔
771
        alter_space_delete(alter);
44,392✔
772
}
44,392✔
773

774
/**
775
 * Rollback all effects of space alter. This is
776
 * a transaction trigger, and it fires most likely
777
 * upon a failed write to the WAL.
778
 *
779
 * Keep in mind that we may end up here in case of
780
 * alter_space_commit() failure (unlikely)
781
 */
782
static void
783
alter_space_rollback(struct trigger *trigger, void * /* event */)
7✔
784
{
785
        struct alter_space *alter = (struct alter_space *) trigger->data;
7✔
786
        /* Rollback alter ops */
787
        class AlterSpaceOp *op;
788
        rlist_foreach_entry(op, &alter->ops, link) {
18✔
789
                op->rollback(alter);
11✔
790
        }
791
        /* Rebuild index maps once for all indexes. */
792
        space_fill_index_map(alter->old_space);
7✔
793
        space_fill_index_map(alter->new_space);
7✔
794
        /*
795
         * Don't forget about space triggers.
796
         */
797
        space_swap_triggers(alter->new_space, alter->old_space);
7✔
798
        struct space *new_space = space_cache_replace(alter->old_space);
7✔
799
        assert(new_space == alter->new_space);
7✔
800
        (void) new_space;
801
        alter_space_delete(alter);
7✔
802
}
7✔
803

804
/**
805
 * alter_space_do() - do all the work necessary to
806
 * create a new space.
807
 *
808
 * If something may fail during alter, it must be done here,
809
 * before a record is written to the Write Ahead Log.  Only
810
 * trivial and infallible actions are left to the commit phase
811
 * of the alter.
812
 *
813
 * The implementation of this function follows "Template Method"
814
 * pattern, providing a skeleton of the alter, while all the
815
 * details are encapsulated in AlterSpaceOp methods.
816
 *
817
 * These are the major steps of alter defining the structure of
818
 * the algorithm and performed regardless of what is altered:
819
 *
820
 * - a copy of the definition of the old space is created
821
 * - the definition of the old space is altered, to get
822
 *   definition of a new space
823
 * - an instance of the new space is created, according to the new
824
 *   definition; the space is so far empty
825
 * - data structures of the new space are built; sometimes, it
826
 *   doesn't need to happen, e.g. when alter only changes the name
827
 *   of a space or an index, or other accidental property.
828
 *   If any data structure needs to be built, e.g. a new index,
829
 *   only this index is built, not the entire space with all its
830
 *   indexes.
831
 * - at commit, the new space is coalesced with the old one.
832
 *   On rollback, the new space is deleted.
833
 */
834
static void
835
alter_space_do(struct txn *txn, struct alter_space *alter)
44,480✔
836
{
837
        /*
838
         * Prepare triggers while we may fail. Note, we don't have to
839
         * free them in case of failure, because they are allocated on
840
         * the region.
841
         */
842
        struct trigger *on_commit, *on_rollback;
843
        on_commit = txn_alter_trigger_new(alter_space_commit, alter);
44,480✔
844
        on_rollback = txn_alter_trigger_new(alter_space_rollback, alter);
44,480✔
845

846
        /* Create a definition of the new space. */
847
        space_dump_def(alter->old_space, &alter->key_list);
44,480✔
848
        class AlterSpaceOp *op;
849
        /*
850
         * Alter the definition of the old space, so that
851
         * a new space can be created with a new definition.
852
         */
853
        rlist_foreach_entry(op, &alter->ops, link)
146,672✔
854
                op->alter_def(alter);
102,192✔
855
        /*
856
         * Create a new (empty) space for the new definition.
857
         * Sic: the triggers are not moved over yet.
858
         */
859
        alter->new_space = space_new_xc(alter->space_def, &alter->key_list);
44,480✔
860
        /*
861
         * Copy the replace function, the new space is at the same recovery
862
         * phase as the old one. This hack is especially necessary for
863
         * system spaces, which may be altered in some row in the
864
         * snapshot/xlog, but needs to continue staying "fully
865
         * built".
866
         */
867
        space_prepare_alter_xc(alter->old_space, alter->new_space);
44,463✔
868

869
        alter->new_space->sequence = alter->old_space->sequence;
44,457✔
870
        memcpy(alter->new_space->access, alter->old_space->access,
44,457✔
871
               sizeof(alter->old_space->access));
44,457✔
872

873
        /*
874
         * Build new indexes, check if tuples conform to
875
         * the new space format.
876
         */
877
        rlist_foreach_entry(op, &alter->ops, link)
146,464✔
878
                op->prepare(alter);
102,065✔
879

880
        /*
881
         * This function must not throw exceptions or yield after
882
         * this point.
883
         */
884

885
        /* Move old indexes, update space format. */
886
        rlist_foreach_entry(op, &alter->ops, link)
146,386✔
887
                op->alter(alter);
101,987✔
888

889
        /* Rebuild index maps once for all indexes. */
890
        space_fill_index_map(alter->old_space);
44,399✔
891
        space_fill_index_map(alter->new_space);
44,399✔
892
        /*
893
         * Don't forget about space triggers.
894
         */
895
        space_swap_triggers(alter->new_space, alter->old_space);
44,399✔
896
        /*
897
         * The new space is ready. Time to update the space
898
         * cache with it.
899
         */
900
        struct space *old_space = space_cache_replace(alter->new_space);
44,399✔
901
        (void) old_space;
902
        assert(old_space == alter->old_space);
44,399✔
903

904
        /*
905
         * Install transaction commit/rollback triggers to either
906
         * finish or rollback the DDL depending on the results of
907
         * writing to WAL.
908
         */
909
        txn_on_commit(txn, on_commit);
44,399✔
910
        txn_on_rollback(txn, on_rollback);
44,399✔
911
}
44,399✔
912

913
/* }}}  */
914

915
/* {{{ AlterSpaceOp descendants - alter operations, such as Add/Drop index */
916

917
/**
918
 * This operation does not modify the space, it just checks that
919
 * tuples stored in it conform to the new format.
920
 */
921
class CheckSpaceFormat: public AlterSpaceOp
16,730✔
922
{
923
public:
924
        CheckSpaceFormat(struct alter_space *alter)
8,365✔
925
                :AlterSpaceOp(alter) {}
8,365✔
926
        virtual void prepare(struct alter_space *alter);
927
};
928

929
void
930
CheckSpaceFormat::prepare(struct alter_space *alter)
8,356✔
931
{
932
        struct space *new_space = alter->new_space;
8,356✔
933
        struct space *old_space = alter->old_space;
8,356✔
934
        struct tuple_format *new_format = new_space->format;
8,356✔
935
        struct tuple_format *old_format = old_space->format;
8,356✔
936
        if (old_format != NULL) {
8,356✔
937
                assert(new_format != NULL);
8,297✔
938
                if (!tuple_format1_can_store_format2_tuples(new_format,
8,297✔
939
                                                            old_format))
940
                    space_check_format_xc(old_space, new_format);
7,283✔
941
        }
942
}
8,324✔
943

944
/** Change non-essential properties of a space. */
945
class ModifySpace: public AlterSpaceOp
946
{
947
public:
948
        ModifySpace(struct alter_space *alter, struct space_def *def)
8,304✔
949
                :AlterSpaceOp(alter), new_def(def), new_dict(NULL) {}
8,304✔
950
        /* New space definition. */
951
        struct space_def *new_def;
952
        /**
953
         * Newely created field dictionary. When new space_def is
954
         * created, it allocates new dictionary. Alter moves new
955
         * names into an old dictionary and deletes new one.
956
         */
957
        struct tuple_dictionary *new_dict;
958
        virtual void alter_def(struct alter_space *alter);
959
        virtual void alter(struct alter_space *alter);
960
        virtual void rollback(struct alter_space *alter);
961
        virtual ~ModifySpace();
962
};
963

964
/** Amend the definition of the new space. */
965
void
966
ModifySpace::alter_def(struct alter_space *alter)
8,304✔
967
{
968
        /*
969
         * Use the old dictionary for the new space, because
970
         * it is already referenced by existing tuple formats.
971
         * We will update it in place in ModifySpace::alter.
972
         */
973
        new_dict = new_def->dict;
8,304✔
974
        new_def->dict = alter->old_space->def->dict;
8,304✔
975
        tuple_dictionary_ref(new_def->dict);
8,304✔
976

977
        space_def_delete(alter->space_def);
8,304✔
978
        alter->space_def = new_def;
8,304✔
979
        /* Now alter owns the def. */
980
        new_def = NULL;
8,304✔
981
}
8,304✔
982

983
void
984
ModifySpace::alter(struct alter_space *alter)
8,266✔
985
{
986
        /*
987
         * Move new names into an old dictionary, which already is
988
         * referenced by existing tuple formats. New dictionary
989
         * object is deleted later, in destructor.
990
         */
991
        tuple_dictionary_swap(alter->new_space->def->dict, new_dict);
8,266✔
992
}
8,266✔
993

994
void
995
ModifySpace::rollback(struct alter_space *alter)
×
996
{
997
        tuple_dictionary_swap(alter->new_space->def->dict, new_dict);
×
998
}
×
999

1000
ModifySpace::~ModifySpace()
24,912✔
1001
{
1002
        if (new_dict != NULL)
8,304✔
1003
                tuple_dictionary_unref(new_dict);
8,304✔
1004
        if (new_def != NULL)
8,304✔
1005
                space_def_delete(new_def);
×
1006
}
16,608✔
1007

1008
/** DropIndex - remove an index from space. */
1009

1010
class DropIndex: public AlterSpaceOp
6,254✔
1011
{
1012
public:
1013
        DropIndex(struct alter_space *alter, struct index_def *def_arg)
3,127✔
1014
                :AlterSpaceOp(alter), old_index_def(def_arg) {}
3,127✔
1015
        /** A reference to the definition of the dropped index. */
1016
        struct index_def *old_index_def;
1017
        virtual void alter_def(struct alter_space *alter);
1018
        virtual void prepare(struct alter_space *alter);
1019
        virtual void commit(struct alter_space *alter, int64_t lsn);
1020
};
1021

1022
/*
1023
 * Alter the definition of the new space and remove
1024
 * the new index from it.
1025
 */
1026
void
1027
DropIndex::alter_def(struct alter_space * /* alter */)
3,127✔
1028
{
1029
        rlist_del_entry(old_index_def, link);
3,127✔
1030
}
3,127✔
1031

1032
/* Do the drop. */
1033
void
1034
DropIndex::prepare(struct alter_space *alter)
3,125✔
1035
{
1036
        if (old_index_def->iid == 0)
3,125✔
1037
                space_drop_primary_key(alter->new_space);
2,377✔
1038
}
3,125✔
1039

1040
void
1041
DropIndex::commit(struct alter_space *alter, int64_t /* signature */)
3,124✔
1042
{
1043
        struct index *index = space_index(alter->old_space,
3,124✔
1044
                                          old_index_def->iid);
6,248✔
1045
        assert(index != NULL);
3,124✔
1046
        index_commit_drop(index);
3,124✔
1047
}
3,124✔
1048

1049
/**
1050
 * A no-op to preserve the old index data in the new space.
1051
 * Added to the alter specification when the index at hand
1052
 * is not affected by alter in any way.
1053
 */
1054
class MoveIndex: public AlterSpaceOp
95,432✔
1055
{
1056
public:
1057
        MoveIndex(struct alter_space *alter, uint32_t iid_arg)
47,716✔
1058
                :AlterSpaceOp(alter), iid(iid_arg) {}
47,716✔
1059
        /** id of the index on the move. */
1060
        uint32_t iid;
1061
        virtual void alter(struct alter_space *alter);
1062
        virtual void rollback(struct alter_space *alter);
1063
};
1064

1065
void
1066
MoveIndex::alter(struct alter_space *alter)
47,606✔
1067
{
1068
        space_swap_index(alter->old_space, alter->new_space, iid, iid);
47,606✔
1069
}
47,606✔
1070

1071
void
1072
MoveIndex::rollback(struct alter_space *alter)
4✔
1073
{
1074
        space_swap_index(alter->old_space, alter->new_space, iid, iid);
4✔
1075
}
4✔
1076

1077
/**
1078
 * Change non-essential properties of an index, i.e.
1079
 * properties not involving index data or layout on disk.
1080
 */
1081
class ModifyIndex: public AlterSpaceOp
1082
{
1083
public:
1084
        ModifyIndex(struct alter_space *alter,
6,701✔
1085
                    struct index_def *new_index_def_arg,
1086
                    struct index_def *old_index_def_arg)
1087
                : AlterSpaceOp(alter),new_index_def(new_index_def_arg),
6,701✔
1088
                  old_index_def(old_index_def_arg) {
6,701✔
1089
                if (new_index_def->iid == 0 &&
13,318✔
1090
                    key_part_cmp(new_index_def->key_def->parts,
19,851✔
1091
                                 new_index_def->key_def->part_count,
6,617✔
1092
                                 old_index_def->key_def->parts,
6,617✔
1093
                                 old_index_def->key_def->part_count) != 0) {
6,617✔
1094
                        /*
1095
                         * Primary parts have been changed -
1096
                         * update secondary indexes.
1097
                         */
1098
                        alter->pk_def = new_index_def->key_def;
12✔
1099
                }
1100
        }
6,701✔
1101
        struct index_def *new_index_def;
1102
        struct index_def *old_index_def;
1103
        virtual void alter_def(struct alter_space *alter);
1104
        virtual void alter(struct alter_space *alter);
1105
        virtual void commit(struct alter_space *alter, int64_t lsn);
1106
        virtual void rollback(struct alter_space *alter);
1107
        virtual ~ModifyIndex();
1108
};
1109

1110
/** Update the definition of the new space */
1111
void
1112
ModifyIndex::alter_def(struct alter_space *alter)
6,701✔
1113
{
1114
        rlist_del_entry(old_index_def, link);
6,701✔
1115
        index_def_list_add(&alter->key_list, new_index_def);
6,701✔
1116
}
6,701✔
1117

1118
void
1119
ModifyIndex::alter(struct alter_space *alter)
6,677✔
1120
{
1121
        assert(old_index_def->iid == new_index_def->iid);
6,677✔
1122
        /*
1123
         * Move the old index to the new space to preserve the
1124
         * original data, but use the new definition.
1125
         */
1126
        space_swap_index(alter->old_space, alter->new_space,
13,354✔
1127
                         old_index_def->iid, new_index_def->iid);
20,031✔
1128
        struct index *old_index = space_index(alter->old_space,
6,677✔
1129
                                              old_index_def->iid);
13,354✔
1130
        assert(old_index != NULL);
6,677✔
1131
        struct index *new_index = space_index(alter->new_space,
6,677✔
1132
                                              new_index_def->iid);
13,354✔
1133
        assert(new_index != NULL);
6,677✔
1134
        SWAP(old_index->def, new_index->def);
6,677✔
1135
        index_update_def(new_index);
6,677✔
1136
}
6,677✔
1137

1138
void
1139
ModifyIndex::commit(struct alter_space *alter, int64_t signature)
6,677✔
1140
{
1141
        struct index *new_index = space_index(alter->new_space,
6,677✔
1142
                                              new_index_def->iid);
13,354✔
1143
        assert(new_index != NULL);
6,677✔
1144
        index_commit_modify(new_index, signature);
6,677✔
1145
}
6,677✔
1146

1147
void
1148
ModifyIndex::rollback(struct alter_space *alter)
×
1149
{
1150
        assert(old_index_def->iid == new_index_def->iid);
×
1151
        /*
1152
         * Restore indexes.
1153
         */
1154
        space_swap_index(alter->old_space, alter->new_space,
×
1155
                         old_index_def->iid, new_index_def->iid);
×
1156
        struct index *old_index = space_index(alter->old_space,
×
1157
                                              old_index_def->iid);
×
1158
        assert(old_index != NULL);
×
1159
        struct index *new_index = space_index(alter->new_space,
×
1160
                                              new_index_def->iid);
×
1161
        assert(new_index != NULL);
×
1162
        SWAP(old_index->def, new_index->def);
×
1163
        index_update_def(old_index);
×
1164
}
×
1165

1166
ModifyIndex::~ModifyIndex()
20,103✔
1167
{
1168
        index_def_delete(new_index_def);
6,701✔
1169
}
13,402✔
1170

1171
/** CreateIndex - add a new index to the space. */
1172
class CreateIndex: public AlterSpaceOp
1173
{
1174
public:
1175
        CreateIndex(struct alter_space *alter)
23,494✔
1176
                :AlterSpaceOp(alter), new_index(NULL), new_index_def(NULL)
23,494✔
1177
        {}
23,494✔
1178
        /** New index. */
1179
        struct index *new_index;
1180
        /** New index index_def. */
1181
        struct index_def *new_index_def;
1182
        virtual void alter_def(struct alter_space *alter);
1183
        virtual void prepare(struct alter_space *alter);
1184
        virtual void commit(struct alter_space *alter, int64_t lsn);
1185
        virtual ~CreateIndex();
1186
};
1187

1188
/** Add definition of the new key to the new space def. */
1189
void
1190
CreateIndex::alter_def(struct alter_space *alter)
23,401✔
1191
{
1192
        index_def_list_add(&alter->key_list, new_index_def);
23,401✔
1193
}
23,401✔
1194

1195
/**
1196
 * Optionally build the new index.
1197
 *
1198
 * During recovery the space is often not fully constructed yet
1199
 * anyway, so there is no need to fully populate index with data,
1200
 * it is done at the end of recovery.
1201
 *
1202
 * Note, that system spaces are exception to this, since
1203
 * they are fully enabled at all times.
1204
 */
1205
void
1206
CreateIndex::prepare(struct alter_space *alter)
23,390✔
1207
{
1208
        /* Get the new index and build it.  */
1209
        new_index = space_index(alter->new_space, new_index_def->iid);
23,390✔
1210
        assert(new_index != NULL);
23,390✔
1211

1212
        if (new_index_def->iid == 0) {
23,390✔
1213
                /*
1214
                 * Adding a primary key: bring the space
1215
                 * up to speed with the current recovery
1216
                 * state. During snapshot recovery it
1217
                 * means preparing the primary key for
1218
                 * build (beginBuild()). During xlog
1219
                 * recovery, it means building the primary
1220
                 * key. After recovery, it means building
1221
                 * all keys.
1222
                 */
1223
                space_add_primary_key_xc(alter->new_space);
7,474✔
1224
                return;
7,473✔
1225
        }
1226
        space_build_index_xc(alter->old_space, new_index,
15,916✔
1227
                             alter->new_space->format);
31,832✔
1228
}
1229

1230
void
1231
CreateIndex::commit(struct alter_space *alter, int64_t signature)
23,376✔
1232
{
1233
        (void) alter;
1234
        assert(new_index != NULL);
23,376✔
1235
        index_commit_create(new_index, signature);
23,376✔
1236
        new_index = NULL;
23,376✔
1237
}
23,376✔
1238

1239
CreateIndex::~CreateIndex()
70,482✔
1240
{
1241
        if (new_index != NULL)
23,494✔
1242
                index_abort_create(new_index);
14✔
1243
        if (new_index_def != NULL)
23,494✔
1244
                index_def_delete(new_index_def);
23,401✔
1245
}
46,988✔
1246

1247
/**
1248
 * RebuildIndex - drop the old index data and rebuild index
1249
 * from by reading the primary key. Used when key_def of
1250
 * an index is changed.
1251
 */
1252
class RebuildIndex: public AlterSpaceOp
1253
{
1254
public:
1255
        RebuildIndex(struct alter_space *alter,
1,266✔
1256
                     struct index_def *new_index_def_arg,
1257
                     struct index_def *old_index_def_arg)
1258
                :AlterSpaceOp(alter), new_index(NULL),
1,266✔
1259
                new_index_def(new_index_def_arg),
1260
                old_index_def(old_index_def_arg)
1,266✔
1261
        {
1262
                /* We may want to rebuild secondary keys as well. */
1263
                if (new_index_def->iid == 0)
1,266✔
1264
                        alter->pk_def = new_index_def->key_def;
1,237✔
1265
        }
1,266✔
1266
        /** New index. */
1267
        struct index *new_index;
1268
        /** New index index_def. */
1269
        struct index_def *new_index_def;
1270
        /** Old index index_def. */
1271
        struct index_def *old_index_def;
1272
        virtual void alter_def(struct alter_space *alter);
1273
        virtual void prepare(struct alter_space *alter);
1274
        virtual void commit(struct alter_space *alter, int64_t signature);
1275
        virtual ~RebuildIndex();
1276
};
1277

1278
/** Add definition of the new key to the new space def. */
1279
void
1280
RebuildIndex::alter_def(struct alter_space *alter)
1,266✔
1281
{
1282
        rlist_del_entry(old_index_def, link);
1,266✔
1283
        index_def_list_add(&alter->key_list, new_index_def);
1,266✔
1284
}
1,266✔
1285

1286
void
1287
RebuildIndex::prepare(struct alter_space *alter)
1,266✔
1288
{
1289
        /* Get the new index and build it.  */
1290
        new_index = space_index(alter->new_space, new_index_def->iid);
1,266✔
1291
        assert(new_index != NULL);
1,266✔
1292
        space_build_index_xc(alter->old_space, new_index,
1,266✔
1293
                             alter->new_space->format);
2,532✔
1294
}
1,250✔
1295

1296
void
1297
RebuildIndex::commit(struct alter_space *alter, int64_t signature)
1,250✔
1298
{
1299
        struct index *old_index = space_index(alter->old_space,
1,250✔
1300
                                              old_index_def->iid);
2,500✔
1301
        assert(old_index != NULL);
1,250✔
1302
        index_commit_drop(old_index);
1,250✔
1303
        assert(new_index != NULL);
1,250✔
1304
        index_commit_create(new_index, signature);
1,250✔
1305
        new_index = NULL;
1,250✔
1306
}
1,250✔
1307

1308
RebuildIndex::~RebuildIndex()
3,798✔
1309
{
1310
        if (new_index != NULL)
1,266✔
1311
                index_abort_create(new_index);
16✔
1312
        if (new_index_def != NULL)
1,266✔
1313
                index_def_delete(new_index_def);
1,266✔
1314
}
2,532✔
1315

1316
/** TruncateIndex - truncate an index. */
1317
class TruncateIndex: public AlterSpaceOp
6,720✔
1318
{
1319
public:
1320
        TruncateIndex(struct alter_space *alter, uint32_t iid)
3,360✔
1321
                : AlterSpaceOp(alter), iid(iid) {}
3,360✔
1322
        /** id of the index to truncate. */
1323
        uint32_t iid;
1324
        virtual void prepare(struct alter_space *alter);
1325
        virtual void commit(struct alter_space *alter, int64_t signature);
1326
};
1327

1328
void
1329
TruncateIndex::prepare(struct alter_space *alter)
3,359✔
1330
{
1331
        if (iid == 0) {
3,359✔
1332
                /*
1333
                 * Notify the engine that the primary index
1334
                 * was truncated.
1335
                 */
1336
                space_drop_primary_key(alter->new_space);
2,066✔
1337
                space_add_primary_key_xc(alter->new_space);
2,066✔
1338
                return;
2,066✔
1339
        }
1340

1341
        /*
1342
         * Although the new index is empty, we still need to call
1343
         * space_build_index() to let the engine know that the
1344
         * index was recreated. For example, Vinyl uses this
1345
         * callback to load indexes during local recovery.
1346
         */
1347
        struct index *new_index = space_index(alter->new_space, iid);
1,293✔
1348
        assert(new_index != NULL);
1,293✔
1349
        space_build_index_xc(alter->new_space, new_index,
1,293✔
1350
                             alter->new_space->format);
2,586✔
1351
}
1352

1353
void
1354
TruncateIndex::commit(struct alter_space *alter, int64_t signature)
3,357✔
1355
{
1356
        struct index *old_index = space_index(alter->old_space, iid);
3,357✔
1357
        struct index *new_index = space_index(alter->new_space, iid);
3,357✔
1358

1359
        index_commit_drop(old_index);
3,357✔
1360
        index_commit_create(new_index, signature);
3,357✔
1361
}
3,357✔
1362

1363
/* }}} */
1364

1365
/**
1366
 * Delete the space. It is already removed from the space cache.
1367
 */
1368
static void
1369
on_drop_space_commit(struct trigger *trigger, void *event)
68,561✔
1370
{
1371
        (void) event;
1372
        struct space *space = (struct space *)trigger->data;
68,561✔
1373
        trigger_run_xc(&on_alter_space, space);
68,561✔
1374
        space_delete(space);
68,561✔
1375
}
68,561✔
1376

1377
/**
1378
 * Return the original space back into the cache. The effect
1379
 * of all other events happened after the space was removed were
1380
 * reverted by the cascading rollback.
1381
 */
1382
static void
1383
on_drop_space_rollback(struct trigger *trigger, void *event)
×
1384
{
1385
        (void) event;
1386
        struct space *space = (struct space *)trigger->data;
×
1387
        space_cache_replace(space);
×
1388
}
×
1389

1390
/**
1391
 * Run the triggers registered on commit of a change in _space.
1392
 */
1393
static void
1394
on_create_space_commit(struct trigger *trigger, void *event)
73,668✔
1395
{
1396
        (void) event;
1397
        struct space *space = (struct space *)trigger->data;
73,668✔
1398
        trigger_run_xc(&on_alter_space, space);
73,668✔
1399
}
73,668✔
1400

1401
/**
1402
 * A trigger invoked on commit/rollback of DROP/ADD space.
1403
 * The trigger removes the space from the space cache.
1404
 *
1405
 * By the time the space is removed, it should be empty: we
1406
 * rely on cascading rollback.
1407
 */
1408
static void
1409
on_create_space_rollback(struct trigger *trigger, void *event)
×
1410
{
1411
        (void) event;
1412
        struct space *space = (struct space *)trigger->data;
×
1413
        struct space *cached = space_cache_delete(space_id(space));
×
1414
        (void) cached;
1415
        assert(cached == space);
×
1416
        space_delete(space);
×
1417
}
×
1418

1419
/**
1420
 * Create MoveIndex operation for a range of indexes in a space
1421
 * for range [begin, end)
1422
 */
1423
void
1424
alter_space_move_indexes(struct alter_space *alter, uint32_t begin,
76,603✔
1425
                         uint32_t end)
1426
{
1427
        struct space *old_space = alter->old_space;
76,603✔
1428
        bool is_min_field_count_changed;
1429
        if (old_space->format != NULL) {
76,603✔
1430
                is_min_field_count_changed =
55,848✔
1431
                        old_space->format->min_field_count !=
55,848✔
1432
                        alter->new_min_field_count;
55,848✔
1433
        } else {
1434
                is_min_field_count_changed = false;
20,755✔
1435
        }
1436
        for (uint32_t index_id = begin; index_id < end; ++index_id) {
134,657✔
1437
                struct index *old_index = space_index(old_space, index_id);
58,054✔
1438
                if (old_index == NULL)
58,054✔
1439
                        continue;
67,980✔
1440
                struct index_def *old_def = old_index->def;
48,108✔
1441
                struct index_def *new_def;
1442
                uint32_t min_field_count = alter->new_min_field_count;
48,108✔
1443
                if (alter->pk_def == NULL || !index_depends_on_pk(old_index)) {
48,108✔
1444
                        if (is_min_field_count_changed) {
48,088✔
1445
                                new_def = index_def_dup(old_def);
6,628✔
1446
                                index_def_update_optionality(new_def,
1447
                                                             min_field_count);
6,628✔
1448
                                (void) new ModifyIndex(alter, new_def, old_def);
6,628✔
1449
                        } else {
1450
                                (void) new MoveIndex(alter, old_def->iid);
41,460✔
1451
                        }
1452
                        continue;
48,088✔
1453
                }
1454
                /*
1455
                 * Rebuild secondary indexes that depend on the
1456
                 * primary key since primary key parts have changed.
1457
                 */
1458
                new_def = index_def_new(old_def->space_id, old_def->iid,
80✔
1459
                                        old_def->name, strlen(old_def->name),
40✔
1460
                                        old_def->type, &old_def->opts,
20✔
1461
                                        old_def->key_def, alter->pk_def);
20✔
1462
                index_def_update_optionality(new_def, min_field_count);
20✔
1463
                auto guard = make_scoped_guard([=] { index_def_delete(new_def); });
40✔
1464
                if (!index_def_change_requires_rebuild(old_index, new_def))
20✔
1465
                        (void) new ModifyIndex(alter, new_def, old_def);
12✔
1466
                else
1467
                        (void) new RebuildIndex(alter, new_def, old_def);
8✔
1468
                guard.is_active = false;
20✔
1469
        }
1470
}
76,603✔
1471

1472
/**
1473
 * A trigger which is invoked on replace in a data dictionary
1474
 * space _space.
1475
 *
1476
 * Generally, whenever a data dictionary change occurs
1477
 * 2 things should be done:
1478
 *
1479
 * - space cache should be updated, and changes in the space
1480
 *   cache should be reflected in Lua bindings
1481
 *   (this is done in space_cache_replace() and
1482
 *   space_cache_delete())
1483
 *
1484
 * - the space which is changed should be rebuilt according
1485
 *   to the nature of the modification, i.e. indexes added/dropped,
1486
 *   tuple format changed, etc.
1487
 *
1488
 * When dealing with an update of _space space, we have 3 major
1489
 * cases:
1490
 *
1491
 * 1) insert a new tuple: creates a new space
1492
 *    The trigger prepares a space structure to insert
1493
 *    into the  space cache and registers an on commit
1494
 *    hook to perform the registration. Should the statement
1495
 *    itself fail, transaction is rolled back, the transaction
1496
 *    rollback hook must be there to delete the created space
1497
 *    object, avoiding a memory leak. The hooks are written
1498
 *    in a way that excludes the possibility of a failure.
1499
 *
1500
 * 2) delete a tuple: drops an existing space.
1501
 *
1502
 *    A space can be dropped only if it has no indexes.
1503
 *    The only reason for this restriction is that there
1504
 *    must be no tuples in _index without a corresponding tuple
1505
 *    in _space. It's not possible to delete such tuples
1506
 *    automatically (this would require multi-statement
1507
 *    transactions), so instead the trigger verifies that the
1508
 *    records have been deleted by the user.
1509
 *
1510
 *    Then the trigger registers transaction commit hook to
1511
 *    perform the deletion from the space cache.  No rollback hook
1512
 *    is required: if the transaction is rolled back, nothing is
1513
 *    done.
1514
 *
1515
 * 3) modify an existing tuple: some space
1516
 *    properties are immutable, but it's OK to change
1517
 *    space name or field count. This is done in WAL-error-
1518
 *    safe mode.
1519
 *
1520
 * A note about memcached_space: Tarantool 1.4 had a check
1521
 * which prevented re-definition of memcached_space. With
1522
 * dynamic space configuration such a check would be particularly
1523
 * clumsy, so it is simply not done.
1524
 */
1525
static void
1526
on_replace_dd_space(struct trigger * /* trigger */, void *event)
150,590✔
1527
{
1528
        struct txn *txn = (struct txn *) event;
150,590✔
1529
        txn_check_singlestatement_xc(txn, "Space _space");
150,590✔
1530
        struct txn_stmt *stmt = txn_current_stmt(txn);
150,589✔
1531
        struct tuple *old_tuple = stmt->old_tuple;
150,589✔
1532
        struct tuple *new_tuple = stmt->new_tuple;
150,589✔
1533
        struct region *region = &fiber()->gc;
150,589✔
1534
        /*
1535
         * Things to keep in mind:
1536
         * - old_tuple is set only in case of UPDATE.  For INSERT
1537
         *   or REPLACE it is NULL.
1538
         * - the trigger may be called inside recovery from a snapshot,
1539
         *   when index look up is not possible
1540
         * - _space, _index and other metaspaces initially don't
1541
         *   have a tuple which represents it, this tuple is only
1542
         *   created during recovery from a snapshot.
1543
         *
1544
         * Let's establish whether an old space exists. Use
1545
         * old_tuple ID field, if old_tuple is set, since UPDATE
1546
         * may have changed space id.
1547
         */
1548
        uint32_t old_id = tuple_field_u32_xc(old_tuple ? old_tuple : new_tuple,
150,589✔
1549
                                             BOX_SPACE_FIELD_ID);
150,589✔
1550
        struct space *old_space = space_by_id(old_id);
150,589✔
1551
        if (new_tuple != NULL && old_space == NULL) { /* INSERT */
150,589✔
1552
                struct space_def *def =
1553
                        space_def_new_from_tuple(new_tuple, ER_CREATE_SPACE,
1554
                                                 region);
73,696✔
1555
                access_check_ddl(def->name, def->uid, SC_SPACE, PRIV_C, true);
73,669✔
1556
                auto def_guard =
1557
                        make_scoped_guard([=] { space_def_delete(def); });
221,007✔
1558
                RLIST_HEAD(empty_list);
73,669✔
1559
                struct space *space = space_new_xc(def, &empty_list);
73,669✔
1560
                /**
1561
                 * The new space must be inserted in the space
1562
                 * cache right away to achieve linearisable
1563
                 * execution on a replica.
1564
                 */
1565
                (void) space_cache_replace(space);
73,668✔
1566
                /*
1567
                 * So may happen that until the DDL change record
1568
                 * is written to the WAL, the space is used for
1569
                 * insert/update/delete. All these updates are
1570
                 * rolled back by the pipelined rollback mechanism,
1571
                 * so it's safe to simply drop the space on
1572
                 * rollback.
1573
                 */
1574
                struct trigger *on_commit =
1575
                        txn_alter_trigger_new(on_create_space_commit, space);
73,668✔
1576
                txn_on_commit(txn, on_commit);
73,668✔
1577
                struct trigger *on_rollback =
1578
                        txn_alter_trigger_new(on_create_space_rollback, space);
73,668✔
1579
                txn_on_rollback(txn, on_rollback);
147,336✔
1580
        } else if (new_tuple == NULL) { /* DELETE */
76,893✔
1581
                access_check_ddl(old_space->def->name, old_space->def->uid,
68,566✔
1582
                                 SC_SPACE, PRIV_D, true);
68,566✔
1583
                /* Verify that the space is empty (has no indexes) */
1584
                if (old_space->index_count) {
68,565✔
1585
                        tnt_raise(ClientError, ER_DROP_SPACE,
2✔
1586
                                  space_name(old_space),
1587
                                  "the space has indexes");
1588
                }
1589
                if (schema_find_grants("space", old_space->def->id)) {
68,563✔
1590
                        tnt_raise(ClientError, ER_DROP_SPACE,
×
1591
                                  space_name(old_space),
1592
                                  "the space has grants");
1593
                }
1594
                /*
1595
                 * Before 1.7.6 a space record was removed before
1596
                 * the corresponding record in the _truncate system
1597
                 * space so the following check should be disabled.
1598
                 */
1599
                if (dd_version_id >= version_id(1, 7, 6) &&
137,119✔
1600
                    space_has_data(BOX_TRUNCATE_ID, 0, old_space->def->id))
68,556✔
1601
                        tnt_raise(ClientError, ER_DROP_SPACE,
2✔
1602
                                  space_name(old_space),
1603
                                  "the space has truncate record");
1604
                /**
1605
                 * The space must be deleted from the space
1606
                 * cache right away to achieve linearisable
1607
                 * execution on a replica.
1608
                 */
1609
                struct space *space = space_cache_delete(space_id(old_space));
68,561✔
1610
                struct trigger *on_commit =
1611
                        txn_alter_trigger_new(on_drop_space_commit, space);
68,561✔
1612
                txn_on_commit(txn, on_commit);
68,561✔
1613
                struct trigger *on_rollback =
1614
                        txn_alter_trigger_new(on_drop_space_rollback, space);
68,561✔
1615
                txn_on_rollback(txn, on_rollback);
68,561✔
1616
        } else { /* UPDATE, REPLACE */
1617
                assert(old_space != NULL && new_tuple != NULL);
8,327✔
1618
                struct space_def *def =
1619
                        space_def_new_from_tuple(new_tuple, ER_ALTER_SPACE,
1620
                                                 region);
8,327✔
1621
                access_check_ddl(def->name, def->uid, SC_SPACE, PRIV_A, true);
8,305✔
1622
                auto def_guard =
1623
                        make_scoped_guard([=] { space_def_delete(def); });
16,611✔
1624
                if (def->id != space_id(old_space))
8,305✔
1625
                        tnt_raise(ClientError, ER_ALTER_SPACE,
×
1626
                                  space_name(old_space),
1627
                                  "space id is immutable");
1628
                if (strcmp(def->engine_name, old_space->def->engine_name) != 0)
8,305✔
1629
                        tnt_raise(ClientError, ER_ALTER_SPACE,
1✔
1630
                                  space_name(old_space),
1631
                                  "can not change space engine");
1632
                /*
1633
                 * Allow change of space properties, but do it
1634
                 * in WAL-error-safe mode.
1635
                 */
1636
                struct alter_space *alter = alter_space_new(old_space);
8,304✔
1637
                auto alter_guard =
1638
                        make_scoped_guard([=] {alter_space_delete(alter);});
16,646✔
1639
                /*
1640
                 * Calculate a new min_field_count. It can be
1641
                 * changed by resetting space:format(), if an old
1642
                 * format covers some nullable indexed fields in
1643
                 * the format tail. And when the format is reset,
1644
                 * these fields become optional - index
1645
                 * comparators must be updated.
1646
                 */
1647
                struct key_def **keys;
1648
                size_t bsize = old_space->index_count * sizeof(keys[0]);
8,304✔
1649
                keys = (struct key_def **) region_alloc_xc(&fiber()->gc,
8,304✔
1650
                                                           bsize);
8,304✔
1651
                for (uint32_t i = 0; i < old_space->index_count; ++i)
16,256✔
1652
                        keys[i] = old_space->index[i]->def->key_def;
7,952✔
1653
                alter->new_min_field_count =
8,304✔
1654
                        tuple_format_min_field_count(keys,
16,608✔
1655
                                                     old_space->index_count,
8,304✔
1656
                                                     def->fields,
8,304✔
1657
                                                     def->field_count);
1658
                (void) new CheckSpaceFormat(alter);
8,304✔
1659
                (void) new ModifySpace(alter, def);
8,304✔
1660
                def_guard.is_active = false;
8,304✔
1661
                /* Create MoveIndex ops for all space indexes. */
1662
                alter_space_move_indexes(alter, 0, old_space->index_id_max + 1);
8,304✔
1663
                alter_space_do(txn, alter);
8,304✔
1664
                alter_guard.is_active = false;
8,266✔
1665
        }
1666
}
150,495✔
1667

1668
/**
1669
 * Just like with _space, 3 major cases:
1670
 *
1671
 * - insert a tuple = addition of a new index. The
1672
 *   space should exist.
1673
 *
1674
 * - delete a tuple - drop index.
1675
 *
1676
 * - update a tuple - change of index type or key parts.
1677
 *   Change of index type is the same as deletion of the old
1678
 *   index and addition of the new one.
1679
 *
1680
 *   A new index needs to be built before we attempt to commit
1681
 *   a record to the write ahead log, since:
1682
 *
1683
 *   1) if it fails, it's not good to end up with a corrupt index
1684
 *   which is already committed to WAL
1685
 *
1686
 *   2) Tarantool indexes also work as constraints (min number of
1687
 *   fields in the space, field uniqueness), and it's not good to
1688
 *   commit to WAL a constraint which is not enforced in the
1689
 *   current data set.
1690
 *
1691
 *   When adding a new index, ideally we'd also need to rebuild
1692
 *   all tuple formats in all tuples, since the old format may not
1693
 *   be ideal for the new index. We, however, do not do that,
1694
 *   since that would entail rebuilding all indexes at once.
1695
 *   Instead, the default tuple format of the space is changed,
1696
 *   and as tuples get updated/replaced, all tuples acquire a new
1697
 *   format.
1698
 *
1699
 *   The same is the case with dropping an index: nothing is
1700
 *   rebuilt right away, but gradually the extra space reserved
1701
 *   for offsets is relinquished to the slab allocator as tuples
1702
 *   are modified.
1703
 */
1704
static void
1705
on_replace_dd_index(struct trigger * /* trigger */, void *event)
34,209✔
1706
{
1707
        struct txn *txn = (struct txn *) event;
34,209✔
1708
        txn_check_singlestatement_xc(txn, "Space _index");
34,209✔
1709
        struct txn_stmt *stmt = txn_current_stmt(txn);
34,207✔
1710
        struct tuple *old_tuple = stmt->old_tuple;
34,207✔
1711
        struct tuple *new_tuple = stmt->new_tuple;
34,207✔
1712
        uint32_t id = tuple_field_u32_xc(old_tuple ? old_tuple : new_tuple,
34,207✔
1713
                                         BOX_INDEX_FIELD_SPACE_ID);
34,207✔
1714
        uint32_t iid = tuple_field_u32_xc(old_tuple ? old_tuple : new_tuple,
34,207✔
1715
                                          BOX_INDEX_FIELD_ID);
34,207✔
1716
        struct space *old_space = space_cache_find_xc(id);
34,207✔
1717
        enum priv_type priv_type = new_tuple ? PRIV_C : PRIV_D;
34,207✔
1718
        if (old_tuple && new_tuple)
34,207✔
1719
                priv_type = PRIV_A;
346✔
1720
        access_check_ddl(old_space->def->name, old_space->def->uid, SC_SPACE,
34,207✔
1721
                         priv_type, true);
34,207✔
1722
        struct index *old_index = space_index(old_space, iid);
34,206✔
1723

1724
        /*
1725
         * Deal with various cases of dropping of the primary key.
1726
         */
1727
        if (iid == 0 && new_tuple == NULL) {
34,206✔
1728
                /*
1729
                 * Dropping the primary key in a system space: off limits.
1730
                 */
1731
                if (space_is_system(old_space))
2,382✔
1732
                        tnt_raise(ClientError, ER_LAST_DROP,
1✔
1733
                                  space_name(old_space));
1734
                /*
1735
                 * Can't drop primary key before secondary keys.
1736
                 */
1737
                if (old_space->index_count > 1) {
2,381✔
1738
                        tnt_raise(ClientError, ER_DROP_PRIMARY_KEY,
1✔
1739
                                  space_name(old_space));
1740
                }
1741
                /*
1742
                 * Can't drop primary key before space sequence.
1743
                 */
1744
                if (old_space->sequence != NULL) {
2,380✔
1745
                        tnt_raise(ClientError, ER_ALTER_SPACE,
1✔
1746
                                  space_name(old_space),
1747
                                  "can not drop primary key while "
1748
                                  "space sequence exists");
1749
                }
1750
        }
1751

1752
        if (iid != 0 && space_index(old_space, 0) == NULL) {
34,203✔
1753
                /*
1754
                 * A secondary index can not be created without
1755
                 * a primary key.
1756
                 */
1757
                tnt_raise(ClientError, ER_ALTER_SPACE,
×
1758
                          space_name(old_space),
1759
                          "can not add a secondary key before primary");
1760
        }
1761

1762
        struct alter_space *alter = alter_space_new(old_space);
34,203✔
1763
        auto scoped_guard =
1764
                make_scoped_guard([=] { alter_space_delete(alter); });
68,548✔
1765

1766
        /*
1767
         * Handle the following 4 cases:
1768
         * 1. Simple drop of an index.
1769
         * 2. Creation of a new index: primary or secondary.
1770
         * 3. Change of an index which does not require a rebuild.
1771
         * 4. Change of an index which does require a rebuild.
1772
         */
1773
        /*
1774
         * First, move all unchanged indexes from the old space
1775
         * to the new one.
1776
         */
1777
        /* Case 1: drop the index, if it is dropped. */
1778
        if (old_index != NULL && new_tuple == NULL) {
34,203✔
1779
                alter_space_move_indexes(alter, 0, iid);
3,127✔
1780
                (void) new DropIndex(alter, old_index->def);
3,127✔
1781
        }
1782
        /* Case 2: create an index, if it is simply created. */
1783
        if (old_index == NULL && new_tuple != NULL) {
34,203✔
1784
                alter_space_move_indexes(alter, 0, iid);
23,494✔
1785
                CreateIndex *create_index = new CreateIndex(alter);
23,494✔
1786
                create_index->new_index_def =
23,401✔
1787
                        index_def_new_from_tuple(new_tuple, old_space);
23,494✔
1788
                index_def_update_optionality(create_index->new_index_def,
23,401✔
1789
                                             alter->new_min_field_count);
23,401✔
1790
        }
1791
        /* Case 3 and 4: check if we need to rebuild index data. */
1792
        if (old_index != NULL && new_tuple != NULL) {
34,110✔
1793
                struct index_def *index_def;
1794
                index_def = index_def_new_from_tuple(new_tuple, old_space);
7,582✔
1795
                auto index_def_guard =
1796
                        make_scoped_guard([=] { index_def_delete(index_def); });
21,406✔
1797
                /*
1798
                 * To detect which key parts are optional,
1799
                 * min_field_count is required. But
1800
                 * min_field_count from the old space format can
1801
                 * not be used. For example, consider the case,
1802
                 * when a space has no format, has a primary index
1803
                 * on the first field and has a single secondary
1804
                 * index on a non-nullable second field. Min field
1805
                 * count here is 2. Now alter the secondary index
1806
                 * to make its part be nullable. In the
1807
                 * 'old_space' min_field_count is still 2, but
1808
                 * actually it is already 1. Actual
1809
                 * min_field_count must be calculated using old
1810
                 * unchanged indexes, NEW definition of an updated
1811
                 * index and a space format, defined by a user.
1812
                 */
1813
                struct key_def **keys;
1814
                size_t bsize = old_space->index_count * sizeof(keys[0]);
7,575✔
1815
                keys = (struct key_def **) region_alloc_xc(&fiber()->gc,
7,575✔
1816
                                                           bsize);
7,575✔
1817
                for (uint32_t i = 0, j = 0; i < old_space->index_count;
15,653✔
1818
                     ++i) {
1819
                        struct index_def *d = old_space->index[i]->def;
8,078✔
1820
                        if (d->iid != index_def->iid)
8,078✔
1821
                                keys[j++] = d->key_def;
503✔
1822
                        else
1823
                                keys[j++] = index_def->key_def;
7,575✔
1824
                }
1825
                struct space_def *def = old_space->def;
7,575✔
1826
                alter->new_min_field_count =
7,575✔
1827
                        tuple_format_min_field_count(keys,
15,150✔
1828
                                                     old_space->index_count,
7,575✔
1829
                                                     def->fields,
7,575✔
1830
                                                     def->field_count);
1831
                index_def_update_optionality(index_def,
7,575✔
1832
                                             alter->new_min_field_count);
7,575✔
1833
                alter_space_move_indexes(alter, 0, iid);
7,575✔
1834
                if (index_def_cmp(index_def, old_index->def) == 0) {
7,575✔
1835
                        /* Index is not changed so just move it. */
1836
                        (void) new MoveIndex(alter, old_index->def->iid);
6,256✔
1837
                } else if (index_def_change_requires_rebuild(old_index,
1,319✔
1838
                                                             index_def)) {
1839
                        /*
1840
                         * Operation demands an index rebuild.
1841
                         */
1842
                        (void) new RebuildIndex(alter, index_def,
1843
                                                old_index->def);
1,258✔
1844
                        index_def_guard.is_active = false;
1,258✔
1845
                } else {
1846
                        /*
1847
                         * Operation can be done without index rebuild,
1848
                         * but we still need to check that tuples stored
1849
                         * in the space conform to the new format.
1850
                         */
1851
                        (void) new CheckSpaceFormat(alter);
61✔
1852
                        (void) new ModifyIndex(alter, index_def,
1853
                                               old_index->def);
61✔
1854
                        index_def_guard.is_active = false;
61✔
1855
                }
1856
        }
1857
        /*
1858
         * Create MoveIndex ops for the remaining indexes in the
1859
         * old space.
1860
         */
1861
        alter_space_move_indexes(alter, iid + 1, old_space->index_id_max + 1);
34,103✔
1862
        alter_space_do(txn, alter);
34,103✔
1863
        scoped_guard.is_active = false;
34,061✔
1864
}
34,061✔
1865

1866
/**
1867
 * A trigger invoked on replace in space _truncate.
1868
 *
1869
 * In a nutshell, we truncate a space by replacing it with
1870
 * a new empty space with the same definition and indexes.
1871
 * Note, although we instantiate the new space before WAL
1872
 * write, we don't propagate changes to the old space in
1873
 * case a WAL write error happens and we have to rollback.
1874
 * This is OK, because a WAL write error implies cascading
1875
 * rollback of all transactions following this one.
1876
 */
1877
static void
1878
on_replace_dd_truncate(struct trigger * /* trigger */, void *event)
2,282✔
1879
{
1880
        struct txn *txn = (struct txn *) event;
2,282✔
1881
        struct txn_stmt *stmt = txn_current_stmt(txn);
2,282✔
1882
        txn_check_singlestatement_xc(txn, "Space _truncate");
2,282✔
1883
        struct tuple *new_tuple = stmt->new_tuple;
2,282✔
1884

1885
        if (new_tuple == NULL) {
2,282✔
1886
                /* Space drop - nothing to do. */
1887
                return;
388✔
1888
        }
1889

1890
        uint32_t space_id =
1891
                tuple_field_u32_xc(new_tuple, BOX_TRUNCATE_FIELD_SPACE_ID);
2,097✔
1892
        struct space *old_space = space_cache_find_xc(space_id);
2,097✔
1893

1894
        if (stmt->row->type == IPROTO_INSERT) {
2,097✔
1895
                /*
1896
                 * Space creation during initial recovery -
1897
                 * nothing to do.
1898
                 */
1899
                return;
18✔
1900
        }
1901

1902
        /*
1903
         * System spaces use triggers to keep records in sync
1904
         * with internal objects. Since space truncation doesn't
1905
         * invoke triggers, we don't permit it for system spaces.
1906
         */
1907
        if (space_is_system(old_space))
2,079✔
1908
                tnt_raise(ClientError, ER_TRUNCATE_SYSTEM_SPACE,
4✔
1909
                          space_name(old_space));
1910

1911
        /*
1912
         * Check if a write privilege was given, raise an error if not.
1913
         */
1914
        access_check_space_xc(old_space, PRIV_W);
2,075✔
1915

1916
        struct alter_space *alter = alter_space_new(old_space);
2,073✔
1917
        auto scoped_guard =
1918
                make_scoped_guard([=] { alter_space_delete(alter); });
4,147✔
1919

1920
        /*
1921
         * Recreate all indexes of the truncated space.
1922
         */
1923
        for (uint32_t i = 0; i < old_space->index_count; i++) {
5,433✔
1924
                struct index *old_index = old_space->index[i];
3,360✔
1925
                (void) new TruncateIndex(alter, old_index->def->iid);
3,360✔
1926
        }
1927

1928
        alter_space_do(txn, alter);
2,073✔
1929
        scoped_guard.is_active = false;
2,072✔
1930
}
1931

1932
/* {{{ access control */
1933

1934
bool
1935
user_has_data(struct user *user)
233✔
1936
{
1937
        uint32_t uid = user->def->uid;
233✔
1938
        uint32_t spaces[] = { BOX_SPACE_ID, BOX_FUNC_ID, BOX_SEQUENCE_ID,
1939
                              BOX_PRIV_ID, BOX_PRIV_ID };
233✔
1940
        /*
1941
         * owner index id #1 for _space and _func and _priv.
1942
         * For _priv also check that the user has no grants.
1943
         */
1944
        uint32_t indexes[] = { 1, 1, 1, 1, 0 };
233✔
1945
        uint32_t count = sizeof(spaces)/sizeof(*spaces);
233✔
1946
        for (uint32_t i = 0; i < count; i++) {
1,392✔
1947
                if (space_has_data(spaces[i], indexes[i], uid))
1,162✔
1948
                        return true;
3✔
1949
        }
1950
        if (! user_map_is_empty(&user->users))
230✔
1951
                return true;
×
1952
        /*
1953
         * If there was a role, the previous check would have
1954
         * returned true.
1955
         */
1956
        assert(user_map_is_empty(&user->roles));
230✔
1957
        return false;
230✔
1958
}
1959

1960
/**
1961
 * Supposedly a user may have many authentication mechanisms
1962
 * defined, but for now we only support chap-sha1. Get
1963
 * password of chap-sha1 from the _user space.
1964
 */
1965
void
1966
user_def_fill_auth_data(struct user_def *user, const char *auth_data)
4,559✔
1967
{
1968
        uint8_t type = mp_typeof(*auth_data);
9,118✔
1969
        if (type == MP_ARRAY || type == MP_NIL) {
4,559✔
1970
                /*
1971
                 * Nothing useful.
1972
                 * MP_ARRAY is a special case since Lua arrays are
1973
                 * indistinguishable from tables, so an empty
1974
                 * table may well be encoded as an msgpack array.
1975
                 * Treat as no data.
1976
                 */
1977
                return;
×
1978
        }
1979
        if (mp_typeof(*auth_data) != MP_MAP) {
9,118✔
1980
                /** Prevent users from making silly mistakes */
1981
                tnt_raise(ClientError, ER_CREATE_USER,
×
1982
                          user->name, "invalid password format, "
1983
                          "use box.schema.user.passwd() to reset password");
1984
        }
1985
        uint32_t mech_count = mp_decode_map(&auth_data);
4,559✔
1986
        for (uint32_t i = 0; i < mech_count; i++) {
4,559✔
1987
                if (mp_typeof(*auth_data) != MP_STR) {
2,622✔
1988
                        mp_next(&auth_data);
×
1989
                        mp_next(&auth_data);
×
1990
                        continue;
×
1991
                }
1992
                uint32_t len;
1993
                const char *mech_name = mp_decode_str(&auth_data, &len);
1,311✔
1994
                if (strncasecmp(mech_name, "chap-sha1", 9) != 0) {
1,311✔
1995
                        mp_next(&auth_data);
×
1996
                        continue;
×
1997
                }
1998
                const char *hash2_base64 = mp_decode_str(&auth_data, &len);
1,311✔
1999
                if (len != 0 && len != SCRAMBLE_BASE64_SIZE) {
1,311✔
2000
                        tnt_raise(ClientError, ER_CREATE_USER,
×
2001
                                  user->name, "invalid user password");
2002
                }
2003
                if (user->uid == GUEST) {
1,311✔
2004
                    /** Guest user is permitted to have empty password */
2005
                    if (strncmp(hash2_base64, CHAP_SHA1_EMPTY_PASSWORD, len))
1,209✔
2006
                        tnt_raise(ClientError, ER_GUEST_USER_PASSWORD);
1✔
2007
                }
2008

2009
                base64_decode(hash2_base64, len, user->hash2,
1,310✔
2010
                              sizeof(user->hash2));
1,310✔
2011
                break;
1,310✔
2012
        }
2013
}
2014

2015
static struct user_def *
2016
user_def_new_from_tuple(struct tuple *tuple)
4,638✔
2017
{
2018
        uint32_t name_len;
2019
        const char *name = tuple_field_str_xc(tuple, BOX_USER_FIELD_NAME,
2020
                                              &name_len);
4,638✔
2021
        if (name_len > BOX_NAME_MAX) {
4,638✔
2022
                tnt_raise(ClientError, ER_CREATE_USER,
3✔
2023
                          tt_cstr(name, BOX_INVALID_NAME_MAX),
2024
                          "user name is too long");
2025
        }
2026
        size_t size = user_def_sizeof(name_len);
4,635✔
2027
        /* Use calloc: in case user password is empty, fill it with \0 */
2028
        struct user_def *user = (struct user_def *) malloc(size);
4,635✔
2029
        if (user == NULL)
4,635✔
2030
                tnt_raise(OutOfMemory, size, "malloc", "user");
×
2031
        auto def_guard = make_scoped_guard([=] { free(user); });
9,303✔
2032
        user->uid = tuple_field_u32_xc(tuple, BOX_USER_FIELD_ID);
4,635✔
2033
        user->owner = tuple_field_u32_xc(tuple, BOX_USER_FIELD_UID);
4,635✔
2034
        const char *user_type =
2035
                tuple_field_cstr_xc(tuple, BOX_USER_FIELD_TYPE);
4,635✔
2036
        user->type= schema_object_type(user_type);
4,635✔
2037
        memcpy(user->name, name, name_len);
4,635✔
2038
        user->name[name_len] = 0;
4,635✔
2039
        if (user->type != SC_ROLE && user->type != SC_USER) {
4,635✔
2040
                tnt_raise(ClientError, ER_CREATE_USER,
1✔
2041
                          user->name, "unknown user type");
2042
        }
2043
        identifier_check_xc(user->name, name_len);
4,634✔
2044
        /*
2045
         * AUTH_DATA field in _user space should contain
2046
         * chap-sha1 -> base64_encode(sha1(sha1(password), 0).
2047
         * Check for trivial errors when a plain text
2048
         * password is saved in this field instead.
2049
         */
2050
        if (tuple_field_count(tuple) > BOX_USER_FIELD_AUTH_MECH_LIST) {
4,604✔
2051
                const char *auth_data =
2052
                        tuple_field(tuple, BOX_USER_FIELD_AUTH_MECH_LIST);
4,560✔
2053
                const char *tmp = auth_data;
4,560✔
2054
                bool is_auth_empty;
2055
                if (mp_typeof(*auth_data) == MP_ARRAY &&
9,120✔
2056
                    mp_decode_array(&tmp) == 0) {
×
2057
                        is_auth_empty = true;
×
2058
                } else if (mp_typeof(*auth_data) == MP_MAP &&
13,680✔
2059
                           mp_decode_map(&tmp) == 0) {
4,560✔
2060
                        is_auth_empty = true;
3,248✔
2061
                } else {
2062
                        is_auth_empty = false;
1,312✔
2063
                }
2064
                if (!is_auth_empty && user->type == SC_ROLE)
4,560✔
2065
                        tnt_raise(ClientError, ER_CREATE_ROLE, user->name,
1✔
2066
                                  "authentication data can not be set for a "\
2067
                                  "role");
2068
                user_def_fill_auth_data(user, auth_data);
4,559✔
2069
        }
2070
        def_guard.is_active = false;
4,602✔
2071
        return user;
9,204✔
2072
}
2073

2074
static void
2075
user_cache_remove_user(struct trigger * /* trigger */, void *event)
230✔
2076
{
2077
        struct txn *txn = (struct txn *) event;
230✔
2078
        struct txn_stmt *stmt = txn_last_stmt(txn);
230✔
2079
        uint32_t uid = tuple_field_u32_xc(stmt->old_tuple ?
230✔
2080
                                       stmt->old_tuple : stmt->new_tuple,
2081
                                       BOX_USER_FIELD_ID);
230✔
2082
        user_cache_delete(uid);
230✔
2083
}
230✔
2084

2085
static void
2086
user_cache_alter_user(struct trigger * /* trigger */, void *event)
1,246✔
2087
{
2088
        struct txn *txn = (struct txn *) event;
1,246✔
2089
        struct txn_stmt *stmt = txn_last_stmt(txn);
1,246✔
2090
        struct user_def *user = user_def_new_from_tuple(stmt->new_tuple);
1,246✔
2091
        auto def_guard = make_scoped_guard([=] { free(user); });
2,492✔
2092
        /* Can throw if, e.g. too many users. */
2093
        user_cache_replace(user);
1,246✔
2094
        def_guard.is_active = false;
1,246✔
2095
}
1,246✔
2096

2097
/**
2098
 * A trigger invoked on replace in the user table.
2099
 */
2100
static void
2101
on_replace_dd_user(struct trigger * /* trigger */, void *event)
3,637✔
2102
{
2103
        struct txn *txn = (struct txn *) event;
3,637✔
2104
        struct txn_stmt *stmt = txn_current_stmt(txn);
3,637✔
2105
        txn_check_singlestatement_xc(txn, "Space _user");
3,637✔
2106
        struct tuple *old_tuple = stmt->old_tuple;
3,634✔
2107
        struct tuple *new_tuple = stmt->new_tuple;
3,634✔
2108

2109
        uint32_t uid = tuple_field_u32_xc(old_tuple ? old_tuple : new_tuple,
3,634✔
2110
                                          BOX_USER_FIELD_ID);
3,634✔
2111
        struct user *old_user = user_by_id(uid);
3,634✔
2112
        if (new_tuple != NULL && old_user == NULL) { /* INSERT */
3,634✔
2113
                struct user_def *user = user_def_new_from_tuple(new_tuple);
2,145✔
2114
                access_check_ddl(user->name, user->owner, SC_USER, PRIV_C, true);
2,110✔
2115
                auto def_guard = make_scoped_guard([=] { free(user); });
4,221✔
2116
                (void) user_cache_replace(user);
2,110✔
2117
                def_guard.is_active = false;
2,109✔
2118
                struct trigger *on_rollback =
2119
                        txn_alter_trigger_new(user_cache_remove_user, NULL);
2,109✔
2120
                txn_on_rollback(txn, on_rollback);
4,218✔
2121
        } else if (new_tuple == NULL) { /* DELETE */
1,489✔
2122
                access_check_ddl(old_user->def->name, old_user->def->owner,
242✔
2123
                                 SC_USER, PRIV_D, true);
242✔
2124
                /* Can't drop guest or super user */
2125
                if (uid <= (uint32_t) BOX_SYSTEM_USER_ID_MAX || uid == SUPER) {
241✔
2126
                        tnt_raise(ClientError, ER_DROP_USER,
8✔
2127
                                  old_user->def->name,
2128
                                  "the user or the role is a system");
2129
                }
2130
                /*
2131
                 * Can only delete user if it has no spaces,
2132
                 * no functions and no grants.
2133
                 */
2134
                if (user_has_data(old_user)) {
233✔
2135
                        tnt_raise(ClientError, ER_DROP_USER,
3✔
2136
                                  old_user->def->name, "the user has objects");
2137
                }
2138
                struct trigger *on_commit =
2139
                        txn_alter_trigger_new(user_cache_remove_user, NULL);
230✔
2140
                txn_on_commit(txn, on_commit);
230✔
2141
        } else { /* UPDATE, REPLACE */
2142
                assert(old_user != NULL && new_tuple != NULL);
1,247✔
2143
                /*
2144
                 * Allow change of user properties (name,
2145
                 * password) but first check that the change is
2146
                 * correct.
2147
                 */
2148
                struct user_def *user = user_def_new_from_tuple(new_tuple);
1,247✔
2149
                access_check_ddl(user->name, user->uid, SC_USER, PRIV_A,
1,246✔
2150
                                 true);
1,246✔
2151
                auto def_guard = make_scoped_guard([=] { free(user); });
3,738✔
2152
                struct trigger *on_commit =
2153
                        txn_alter_trigger_new(user_cache_alter_user, NULL);
1,246✔
2154
                txn_on_commit(txn, on_commit);
1,246✔
2155
        }
2156
}
3,585✔
2157

2158
/**
2159
 * Get function identifiers from a tuple.
2160
 *
2161
 * @param tuple Tuple to get ids from.
2162
 * @param[out] fid Function identifier.
2163
 * @param[out] uid Owner identifier.
2164
 */
2165
static inline void
2166
func_def_get_ids_from_tuple(const struct tuple *tuple, uint32_t *fid,
128,937✔
2167
                            uint32_t *uid)
2168
{
2169
        *fid = tuple_field_u32_xc(tuple, BOX_FUNC_FIELD_ID);
128,937✔
2170
        *uid = tuple_field_u32_xc(tuple, BOX_FUNC_FIELD_UID);
128,937✔
2171
}
128,937✔
2172

2173
/** Create a function definition from tuple. */
2174
static struct func_def *
2175
func_def_new_from_tuple(const struct tuple *tuple)
64,809✔
2176
{
2177
        uint32_t len;
2178
        const char *name = tuple_field_str_xc(tuple, BOX_FUNC_FIELD_NAME,
2179
                                              &len);
64,809✔
2180
        if (len > BOX_NAME_MAX)
64,809✔
2181
                tnt_raise(ClientError, ER_CREATE_FUNCTION,
2✔
2182
                          tt_cstr(name, BOX_INVALID_NAME_MAX),
2183
                          "function name is too long");
2184
        identifier_check_xc(name, len);
64,807✔
2185
        struct func_def *def = (struct func_def *) malloc(func_def_sizeof(len));
64,792✔
2186
        if (def == NULL)
64,792✔
2187
                tnt_raise(OutOfMemory, func_def_sizeof(len), "malloc", "def");
×
2188
        auto def_guard = make_scoped_guard([=] { free(def); });
129,585✔
2189
        func_def_get_ids_from_tuple(tuple, &def->fid, &def->uid);
64,792✔
2190
        memcpy(def->name, name, len);
64,792✔
2191
        def->name[len] = 0;
64,792✔
2192
        if (tuple_field_count(tuple) > BOX_FUNC_FIELD_SETUID)
64,792✔
2193
                def->setuid = tuple_field_u32_xc(tuple, BOX_FUNC_FIELD_SETUID);
64,792✔
2194
        else
2195
                def->setuid = false;
×
2196
        if (tuple_field_count(tuple) > BOX_FUNC_FIELD_LANGUAGE) {
64,792✔
2197
                const char *language =
2198
                        tuple_field_cstr_xc(tuple, BOX_FUNC_FIELD_LANGUAGE);
64,782✔
2199
                def->language = STR2ENUM(func_language, language);
64,782✔
2200
                if (def->language == func_language_MAX) {
64,782✔
2201
                        tnt_raise(ClientError, ER_FUNCTION_LANGUAGE,
1✔
2202
                                  language, def->name);
2203
                }
2204
        } else {
2205
                /* Lua is the default. */
2206
                def->language = FUNC_LANGUAGE_LUA;
10✔
2207
        }
2208
        def_guard.is_active = false;
64,791✔
2209
        return def;
129,582✔
2210
}
2211

2212
/** Remove a function from function cache */
2213
static void
2214
func_cache_remove_func(struct trigger * /* trigger */, void *event)
64,143✔
2215
{
2216
        struct txn_stmt *stmt = txn_last_stmt((struct txn *) event);
64,143✔
2217
        uint32_t fid = tuple_field_u32_xc(stmt->old_tuple ?
64,143✔
2218
                                       stmt->old_tuple : stmt->new_tuple,
2219
                                       BOX_FUNC_FIELD_ID);
64,143✔
2220
        func_cache_delete(fid);
64,143✔
2221
}
64,143✔
2222

2223
/** Replace a function in the function cache */
2224
static void
2225
func_cache_replace_func(struct trigger * /* trigger */, void *event)
10✔
2226
{
2227
        struct txn_stmt *stmt = txn_last_stmt((struct txn*) event);
10✔
2228
        struct func_def *def = func_def_new_from_tuple(stmt->new_tuple);
10✔
2229
        auto def_guard = make_scoped_guard([=] { free(def); });
20✔
2230
        func_cache_replace(def);
10✔
2231
        def_guard.is_active = false;
10✔
2232
}
10✔
2233

2234
/**
2235
 * A trigger invoked on replace in a space containing
2236
 * functions on which there were defined any grants.
2237
 */
2238
static void
2239
on_replace_dd_func(struct trigger * /* trigger */, void *event)
128,946✔
2240
{
2241
        struct txn *txn = (struct txn *) event;
128,946✔
2242
        txn_check_singlestatement_xc(txn, "Space _func");
128,946✔
2243
        struct txn_stmt *stmt = txn_current_stmt(txn);
128,944✔
2244
        struct tuple *old_tuple = stmt->old_tuple;
128,944✔
2245
        struct tuple *new_tuple = stmt->new_tuple;
128,944✔
2246

2247
        uint32_t fid = tuple_field_u32_xc(old_tuple ? old_tuple : new_tuple,
128,944✔
2248
                                          BOX_FUNC_FIELD_ID);
128,944✔
2249
        struct func *old_func = func_by_id(fid);
128,944✔
2250
        if (new_tuple != NULL && old_func == NULL) { /* INSERT */
128,944✔
2251
                struct func_def *def = func_def_new_from_tuple(new_tuple);
64,789✔
2252
                access_check_ddl(def->name, def->uid, SC_FUNCTION, PRIV_C, true);
64,771✔
2253
                auto def_guard = make_scoped_guard([=] { free(def); });
129,544✔
2254
                func_cache_replace(def);
64,771✔
2255
                def_guard.is_active = false;
64,769✔
2256
                struct trigger *on_rollback =
2257
                        txn_alter_trigger_new(func_cache_remove_func, NULL);
64,769✔
2258
                txn_on_rollback(txn, on_rollback);
129,538✔
2259
        } else if (new_tuple == NULL) {         /* DELETE */
64,155✔
2260
                uint32_t uid;
2261
                func_def_get_ids_from_tuple(old_tuple, &fid, &uid);
64,145✔
2262
                /*
2263
                 * Can only delete func if you're the one
2264
                 * who created it or a superuser.
2265
                 */
2266
                access_check_ddl(old_func->def->name, uid, SC_FUNCTION,
64,145✔
2267
                                 PRIV_D, true);
128,290✔
2268
                /* Can only delete func if it has no grants. */
2269
                if (schema_find_grants("function", old_func->def->fid)) {
64,143✔
2270
                        tnt_raise(ClientError, ER_DROP_FUNCTION,
×
2271
                                  (unsigned) old_func->def->uid,
2272
                                  "function has grants");
2273
                }
2274
                struct trigger *on_commit =
2275
                        txn_alter_trigger_new(func_cache_remove_func, NULL);
64,143✔
2276
                txn_on_commit(txn, on_commit);
64,143✔
2277
        } else {                                /* UPDATE, REPLACE */
2278
                struct func_def *def = func_def_new_from_tuple(new_tuple);
10✔
2279
                auto def_guard = make_scoped_guard([=] { free(def); });
30✔
2280
                access_check_ddl(def->name, def->uid, SC_FUNCTION, PRIV_A,
10✔
2281
                                 true);
10✔
2282
                struct trigger *on_commit =
2283
                        txn_alter_trigger_new(func_cache_replace_func, NULL);
10✔
2284
                txn_on_commit(txn, on_commit);
10✔
2285
        }
2286
}
128,922✔
2287

2288
/** Create a collation identifier definition from tuple. */
2289
void
2290
coll_id_def_new_from_tuple(const struct tuple *tuple, struct coll_id_def *def)
1,332✔
2291
{
2292
        memset(def, 0, sizeof(*def));
1,332✔
2293
        uint32_t name_len, locale_len, type_len;
2294
        def->id = tuple_field_u32_xc(tuple, BOX_COLLATION_FIELD_ID);
1,332✔
2295
        def->name = tuple_field_str_xc(tuple, BOX_COLLATION_FIELD_NAME, &name_len);
1,332✔
2296
        def->name_len = name_len;
1,332✔
2297
        if (name_len > BOX_NAME_MAX)
1,332✔
2298
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2299
                          "collation name is too long");
2300
        identifier_check_xc(def->name, name_len);
1,331✔
2301

2302
        def->owner_id = tuple_field_u32_xc(tuple, BOX_COLLATION_FIELD_UID);
1,316✔
2303
        struct coll_def *base = &def->base;
1,316✔
2304
        const char *type = tuple_field_str_xc(tuple, BOX_COLLATION_FIELD_TYPE,
2305
                                              &type_len);
1,316✔
2306
        base->type = STRN2ENUM(coll_type, type, type_len);
1,316✔
2307
        if (base->type == coll_type_MAX)
1,316✔
2308
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2309
                          "unknown collation type");
2310
        const char *locale =
2311
                tuple_field_str_xc(tuple, BOX_COLLATION_FIELD_LOCALE,
2312
                                   &locale_len);
1,315✔
2313
        if (locale_len > COLL_LOCALE_LEN_MAX)
1,315✔
2314
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
×
2315
                          "collation locale is too long");
2316
        if (locale_len > 0)
1,315✔
2317
                identifier_check_xc(locale, locale_len);
108✔
2318
        snprintf(base->locale, sizeof(base->locale), "%.*s", locale_len,
1,315✔
2319
                 locale);
2,630✔
2320
        const char *options =
2321
                tuple_field_with_type_xc(tuple, BOX_COLLATION_FIELD_OPTIONS,
1,315✔
2322
                                         MP_MAP);
1,315✔
2323

2324
        assert(base->type == COLL_TYPE_ICU);
1,315✔
2325
        if (opts_decode(&base->icu, coll_icu_opts_reg, &options,
1,315✔
2326
                        ER_WRONG_COLLATION_OPTIONS,
2327
                        BOX_COLLATION_FIELD_OPTIONS, NULL) != 0)
2328
                diag_raise();
5✔
2329

2330
        if (base->icu.french_collation == coll_icu_on_off_MAX) {
1,310✔
2331
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2332
                          "ICU wrong french_collation option setting, "
2333
                                  "expected ON | OFF");
2334
        }
2335

2336
        if (base->icu.alternate_handling == coll_icu_alternate_handling_MAX) {
1,309✔
2337
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2338
                          "ICU wrong alternate_handling option setting, "
2339
                                  "expected NON_IGNORABLE | SHIFTED");
2340
        }
2341

2342
        if (base->icu.case_first == coll_icu_case_first_MAX) {
1,308✔
2343
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2344
                          "ICU wrong case_first option setting, "
2345
                                  "expected OFF | UPPER_FIRST | LOWER_FIRST");
2346
        }
2347

2348
        if (base->icu.case_level == coll_icu_on_off_MAX) {
1,307✔
2349
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
2✔
2350
                          "ICU wrong case_level option setting, "
2351
                                  "expected ON | OFF");
2352
        }
2353

2354
        if (base->icu.normalization_mode == coll_icu_on_off_MAX) {
1,305✔
2355
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2356
                          "ICU wrong normalization_mode option setting, "
2357
                                  "expected ON | OFF");
2358
        }
2359

2360
        if (base->icu.strength == coll_icu_strength_MAX) {
1,304✔
2361
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2362
                          "ICU wrong strength option setting, "
2363
                                  "expected PRIMARY | SECONDARY | "
2364
                                  "TERTIARY | QUATERNARY | IDENTICAL");
2365
        }
2366

2367
        if (base->icu.numeric_collation == coll_icu_on_off_MAX) {
1,303✔
2368
                tnt_raise(ClientError, ER_CANT_CREATE_COLLATION,
1✔
2369
                          "ICU wrong numeric_collation option setting, "
2370
                                  "expected ON | OFF");
2371
        }
2372
}
1,302✔
2373

2374
/**
2375
 * Rollback a change in collation space.
2376
 * A change is only INSERT or DELETE, UPDATE is not supported.
2377
 */
2378
static void
2379
coll_id_cache_rollback(struct trigger *trigger, void *event)
×
2380
{
2381
        struct coll_id *coll_id = (struct coll_id *) trigger->data;
×
2382
        struct txn_stmt *stmt = txn_last_stmt((struct txn*) event);
×
2383

2384
        if (stmt->new_tuple == NULL) {
×
2385
                /* DELETE: put the collation identifier back. */
2386
                assert(stmt->old_tuple != NULL);
×
2387
                struct coll_id *replaced_id;
2388
                if (coll_id_cache_replace(coll_id, &replaced_id) != 0) {
×
2389
                        panic("Out of memory on insertion into collation "\
×
2390
                              "cache");
2391
                }
2392
                assert(replaced_id == NULL);
×
2393
        } else {
2394
                /* INSERT: delete the new collation identifier. */
2395
                assert(stmt->old_tuple == NULL);
×
2396
                coll_id_cache_delete(coll_id);
×
2397
                coll_id_delete(coll_id);
×
2398
        }
2399
}
×
2400

2401

2402
/** Free a deleted collation identifier on commit. */
2403
static void
2404
coll_id_cache_commit(struct trigger *trigger, void *event)
98✔
2405
{
2406
        (void) event;
2407
        struct coll_id *coll_id = (struct coll_id *) trigger->data;
98✔
2408
        coll_id_delete(coll_id);
98✔
2409
}
98✔
2410

2411
/**
2412
 * A trigger invoked on replace in a space containing
2413
 * collations that a user defined.
2414
 */
2415
static void
2416
on_replace_dd_collation(struct trigger * /* trigger */, void *event)
1,433✔
2417
{
2418
        struct txn *txn = (struct txn *) event;
1,433✔
2419
        struct txn_stmt *stmt = txn_current_stmt(txn);
1,433✔
2420
        struct tuple *old_tuple = stmt->old_tuple;
1,433✔
2421
        struct tuple *new_tuple = stmt->new_tuple;
1,433✔
2422
        txn_check_singlestatement_xc(txn, "Space _collation");
1,433✔
2423
        struct trigger *on_rollback =
2424
                txn_alter_trigger_new(coll_id_cache_rollback, NULL);
1,432✔
2425
        struct trigger *on_commit =
2426
                txn_alter_trigger_new(coll_id_cache_commit, NULL);
1,432✔
2427
        if (new_tuple == NULL && old_tuple != NULL) {
1,432✔
2428
                /* DELETE */
2429
                /*
2430
                 * TODO: Check that no index uses the collation
2431
                 * identifier.
2432
                 */
2433
                int32_t old_id = tuple_field_u32_xc(old_tuple,
99✔
2434
                                                    BOX_COLLATION_FIELD_ID);
99✔
2435
                struct coll_id *old_coll_id = coll_by_id(old_id);
99✔
2436
                assert(old_coll_id != NULL);
99✔
2437
                access_check_ddl(old_coll_id->name, old_coll_id->owner_id,
99✔
2438
                                 SC_COLLATION, PRIV_D, false);
99✔
2439
                /*
2440
                 * Set on_commit/on_rollback triggers after
2441
                 * deletion from the cache to make trigger logic
2442
                 * simple.
2443
                 */
2444
                coll_id_cache_delete(old_coll_id);
98✔
2445
                on_rollback->data = old_coll_id;
98✔
2446
                on_commit->data = old_coll_id;
98✔
2447
                txn_on_rollback(txn, on_rollback);
98✔
2448
                txn_on_commit(txn, on_commit);
98✔
2449
        } else if (new_tuple != NULL && old_tuple == NULL) {
1,333✔
2450
                /* INSERT */
2451
                struct coll_id_def new_def;
2452
                coll_id_def_new_from_tuple(new_tuple, &new_def);
1,332✔
2453
                access_check_ddl(new_def.name, new_def.owner_id, SC_COLLATION,
1,302✔
2454
                                 PRIV_C, false);
1,302✔
2455
                struct coll_id *new_coll_id = coll_id_new(&new_def);
1,302✔
2456
                if (new_coll_id == NULL)
1,302✔
2457
                        diag_raise();
×
2458
                struct coll_id *replaced_id;
2459
                if (coll_id_cache_replace(new_coll_id, &replaced_id) != 0) {
1,302✔
2460
                        coll_id_delete(new_coll_id);
×
2461
                        diag_raise();
×
2462
                }
2463
                assert(replaced_id == NULL);
1,302✔
2464
                on_rollback->data = new_coll_id;
1,302✔
2465
                txn_on_rollback(txn, on_rollback);
1,302✔
2466
        } else {
2467
                /* UPDATE */
2468
                assert(new_tuple != NULL && old_tuple != NULL);
1✔
2469
                tnt_raise(ClientError, ER_UNSUPPORTED, "collation", "alter");
1✔
2470
        }
2471
}
1,400✔
2472

2473
/**
2474
 * Create a privilege definition from tuple.
2475
 */
2476
void
2477
priv_def_create_from_tuple(struct priv_def *priv, struct tuple *tuple)
62,354✔
2478
{
2479
        priv->grantor_id = tuple_field_u32_xc(tuple, BOX_PRIV_FIELD_ID);
62,354✔
2480
        priv->grantee_id = tuple_field_u32_xc(tuple, BOX_PRIV_FIELD_UID);
62,354✔
2481
        const char *object_type =
2482
                tuple_field_cstr_xc(tuple, BOX_PRIV_FIELD_OBJECT_TYPE);
62,354✔
2483
        priv->object_id = tuple_field_u32_xc(tuple, BOX_PRIV_FIELD_OBJECT_ID);
62,354✔
2484
        priv->object_type = schema_object_type(object_type);
62,354✔
2485
        if (priv->object_type == SC_UNKNOWN) {
62,354✔
2486
                tnt_raise(ClientError, ER_UNKNOWN_SCHEMA_OBJECT,
×
2487
                          object_type);
2488
        }
2489
        priv->access = tuple_field_u32_xc(tuple, BOX_PRIV_FIELD_ACCESS);
62,354✔
2490
}
62,354✔
2491

2492
/*
2493
 * This function checks that:
2494
 * - a privilege is granted from an existing user to an existing
2495
 *   user on an existing object
2496
 * - the grantor has the right to grant (is the owner of the object)
2497
 *
2498
 * @XXX Potentially there is a race in case of rollback, since an
2499
 * object can be changed during WAL write.
2500
 * In the future we must protect grant/revoke with a logical lock.
2501
 */
2502
static void
2503
priv_def_check(struct priv_def *priv, enum priv_type priv_type)
11,603✔
2504
{
2505
        struct user *grantor = user_find_xc(priv->grantor_id);
11,603✔
2506
        /* May be a role */
2507
        struct user *grantee = user_by_id(priv->grantee_id);
11,603✔
2508
        if (grantee == NULL) {
11,603✔
2509
                tnt_raise(ClientError, ER_NO_SUCH_USER,
×
2510
                          int2str(priv->grantee_id));
2511
        }
2512
        const char *name = schema_find_name(priv->object_type, priv->object_id);
11,603✔
2513
        access_check_ddl(name, grantor->def->uid, priv->object_type, priv_type,
11,603✔
2514
                         false);
11,603✔
2515
        switch (priv->object_type) {
11,601✔
2516
        case SC_UNIVERSE:
2517
                if (grantor->def->uid != ADMIN) {
3,484✔
2518
                        tnt_raise(AccessDeniedError,
1✔
2519
                                  priv_name(priv_type),
2520
                                  schema_object_name(SC_UNIVERSE),
2521
                                  name,
2522
                                  grantor->def->name);
2523
                }
2524
                break;
3,483✔
2525
        case SC_SPACE:
2526
        {
2527
                struct space *space = space_cache_find_xc(priv->object_id);
5,614✔
2528
                if (space->def->uid != grantor->def->uid &&
5,615✔
2529
                    grantor->def->uid != ADMIN) {
1✔
2530
                        tnt_raise(AccessDeniedError,
1✔
2531
                                  priv_name(priv_type),
2532
                                  schema_object_name(SC_SPACE), name,
2533
                                  grantor->def->name);
2534
                }
2535
                break;
5,613✔
2536
        }
2537
        case SC_FUNCTION:
2538
        {
2539
                struct func *func = func_cache_find(priv->object_id);
856✔
2540
                if (func->def->uid != grantor->def->uid &&
867✔
2541
                    grantor->def->uid != ADMIN) {
11✔
2542
                        tnt_raise(AccessDeniedError,
×
2543
                                  priv_name(priv_type),
2544
                                  schema_object_name(SC_FUNCTION), name,
2545
                                  grantor->def->name);
2546
                }
2547
                break;
856✔
2548
        }
2549
        case SC_SEQUENCE:
2550
        {
2551
                struct sequence *seq = sequence_cache_find(priv->object_id);
23✔
2552
                if (seq->def->uid != grantor->def->uid &&
24✔
2553
                    grantor->def->uid != ADMIN) {
1✔
2554
                        tnt_raise(AccessDeniedError,
1✔
2555
                                  priv_name(priv_type),
2556
                                  schema_object_name(SC_SEQUENCE), name,
2557
                                  grantor->def->name);
2558
                }
2559
                break;
22✔
2560
        }
2561
        case SC_ROLE:
2562
        {
2563
                struct user *role = user_by_id(priv->object_id);
1,624✔
2564
                if (role == NULL || role->def->type != SC_ROLE) {
1,624✔
2565
                        tnt_raise(ClientError, ER_NO_SUCH_ROLE,
×
2566
                                  role ? role->def->name :
2567
                                  int2str(priv->object_id));
2568
                }
2569
                /*
2570
                 * Only the creator of the role can grant or revoke it.
2571
                 * Everyone can grant 'PUBLIC' role.
2572
                 */
2573
                if (role->def->owner != grantor->def->uid &&
1,641✔
2574
                    grantor->def->uid != ADMIN &&
32✔
2575
                    (role->def->uid != PUBLIC || priv->access != PRIV_X)) {
29✔
2576
                        tnt_raise(AccessDeniedError,
1✔
2577
                                  priv_name(priv_type),
2578
                                  schema_object_name(SC_ROLE), name,
2579
                                  grantor->def->name);
2580
                }
2581
                /* Not necessary to do during revoke, but who cares. */
2582
                role_check(grantee, role);
1,623✔
2583
        }
2584
        default:
2585
                break;
1,619✔
2586
        }
2587
        if (priv->access == 0) {
11,593✔
2588
                tnt_raise(ClientError, ER_GRANT,
1✔
2589
                          "the grant tuple has no privileges");
2590
        }
2591
}
11,592✔
2592

2593
/**
2594
 * Update a metadata cache object with the new access
2595
 * data.
2596
 */
2597
static void
2598
grant_or_revoke(struct priv_def *priv)
11,592✔
2599
{
2600
        struct user *grantee = user_by_id(priv->grantee_id);
11,592✔
2601
        if (grantee == NULL)
11,592✔
2602
                return;
×
2603
        if (priv->object_type == SC_ROLE) {
11,592✔
2604
                struct user *role = user_by_id(priv->object_id);
1,619✔
2605
                if (role == NULL || role->def->type != SC_ROLE)
1,619✔
2606
                        return;
×
2607
                if (priv->access)
1,619✔
2608
                        role_grant(grantee, role);
1,228✔
2609
                else
2610
                        role_revoke(grantee, role);
391✔
2611
        } else {
2612
                priv_grant(grantee, priv);
9,973✔
2613
        }
2614
}
2615

2616
/** A trigger called on rollback of grant, or on commit of revoke. */
2617
static void
2618
revoke_priv(struct trigger * /* trigger */, void *event)
817✔
2619
{
2620
        struct txn *txn = (struct txn *) event;
817✔
2621
        struct txn_stmt *stmt = txn_last_stmt(txn);
817✔
2622
        struct tuple *tuple = (stmt->new_tuple ?
817✔
2623
                               stmt->new_tuple : stmt->old_tuple);
817✔
2624
        struct priv_def priv;
2625
        priv_def_create_from_tuple(&priv, tuple);
817✔
2626
        /*
2627
         * Access to the object has been removed altogether so
2628
         * there should be no grants at all. If only some grants
2629
         * were removed, modify_priv trigger would have been
2630
         * invoked.
2631
         */
2632
        priv.access = 0;
817✔
2633
        grant_or_revoke(&priv);
817✔
2634
}
817✔
2635

2636
/** A trigger called on rollback of grant, or on commit of revoke. */
2637
static void
2638
modify_priv(struct trigger * /* trigger */, void *event)
576✔
2639
{
2640
        struct txn_stmt *stmt = txn_last_stmt((struct txn *) event);
576✔
2641
        struct priv_def priv;
2642
        priv_def_create_from_tuple(&priv, stmt->new_tuple);
576✔
2643
        grant_or_revoke(&priv);
576✔
2644
}
576✔
2645

2646
/**
2647
 * A trigger invoked on replace in the space containing
2648
 * all granted privileges.
2649
 */
2650
static void
2651
on_replace_dd_priv(struct trigger * /* trigger */, void *event)
11,605✔
2652
{
2653
        struct txn *txn = (struct txn *) event;
11,605✔
2654
        txn_check_singlestatement_xc(txn, "Space _priv");
11,605✔
2655
        struct txn_stmt *stmt = txn_current_stmt(txn);
11,603✔
2656
        struct tuple *old_tuple = stmt->old_tuple;
11,603✔
2657
        struct tuple *new_tuple = stmt->new_tuple;
11,603✔
2658
        struct priv_def priv;
2659

2660
        if (new_tuple != NULL && old_tuple == NULL) {        /* grant */
11,603✔
2661
                priv_def_create_from_tuple(&priv, new_tuple);
10,206✔
2662
                /*
2663
                 * Add system privileges explicitly to the
2664
                 * universe grant issued prior to 1.7.7 in
2665
                 * case upgrade script has not been invoked.
2666
                 */
2667
                if (priv.object_type == SC_UNIVERSE &&
12,896✔
2668
                    dd_version_id < version_id(1, 7, 7)) {
2,690✔
2669

2670
                        priv.access |= PRIV_S;
55✔
2671
                        priv.access |= PRIV_U;
55✔
2672
                        /*
2673
                         * For admin we have to set his privileges
2674
                         * explicitly because he needs them in upgrade and
2675
                         * bootstrap script
2676
                         */
2677
                        if (priv.grantor_id == ADMIN) {
55✔
2678
                                priv.access = admin_credentials.universal_access;
55✔
2679
                        }
2680
                }
2681
                priv_def_check(&priv, PRIV_GRANT);
10,206✔
2682
                grant_or_revoke(&priv);
10,199✔
2683
                struct trigger *on_rollback =
2684
                        txn_alter_trigger_new(revoke_priv, NULL);
10,199✔
2685
                txn_on_rollback(txn, on_rollback);
10,199✔
2686
        } else if (new_tuple == NULL) {                /* revoke */
1,397✔
2687
                assert(old_tuple);
819✔
2688
                priv_def_create_from_tuple(&priv, old_tuple);
819✔
2689
                priv_def_check(&priv, PRIV_REVOKE);
819✔
2690
                struct trigger *on_commit =
2691
                        txn_alter_trigger_new(revoke_priv, NULL);
817✔
2692
                txn_on_commit(txn, on_commit);
817✔
2693
        } else {                                       /* modify */
2694
                priv_def_create_from_tuple(&priv, new_tuple);
578✔
2695
                priv_def_check(&priv, PRIV_GRANT);
578✔
2696
                struct trigger *on_commit =
2697
                        txn_alter_trigger_new(modify_priv, NULL);
576✔
2698
                txn_on_commit(txn, on_commit);
576✔
2699
        }
2700
}
11,592✔
2701

2702
/* }}} access control */
2703

2704
/* {{{ cluster configuration */
2705

2706
/**
2707
 * This trigger is invoked only upon initial recovery, when
2708
 * reading contents of the system spaces from the snapshot.
2709
 *
2710
 * Before a cluster is assigned a cluster id it's read only.
2711
 * Since during recovery state of the WAL doesn't
2712
 * concern us, we can safely change the cluster id in before-replace
2713
 * event, not in after-replace event.
2714
 */
2715
static void
2716
on_replace_dd_schema(struct trigger * /* trigger */, void *event)
71,246✔
2717
{
2718
        struct txn *txn = (struct txn *) event;
71,246✔
2719
        txn_check_singlestatement_xc(txn, "Space _schema");
71,246✔
2720
        struct txn_stmt *stmt = txn_current_stmt(txn);
71,243✔
2721
        struct tuple *old_tuple = stmt->old_tuple;
71,243✔
2722
        struct tuple *new_tuple = stmt->new_tuple;
71,243✔
2723
        const char *key = tuple_field_cstr_xc(new_tuple ? new_tuple : old_tuple,
71,243✔
2724
                                              BOX_SCHEMA_FIELD_KEY);
71,243✔
2725
        if (strcmp(key, "cluster") == 0) {
71,243✔
2726
                if (new_tuple == NULL)
791✔
2727
                        tnt_raise(ClientError, ER_REPLICASET_UUID_IS_RO);
×
2728
                tt_uuid uu;
2729
                tuple_field_uuid_xc(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu);
791✔
2730
                REPLICASET_UUID = uu;
791✔
2731
        } else if (strcmp(key, "version") == 0) {
70,452✔
2732
                if (new_tuple != NULL) {
653✔
2733
                        uint32_t major, minor, patch;
2734
                        if (tuple_field_u32(new_tuple, 1, &major) != 0 ||
1,304✔
2735
                            tuple_field_u32(new_tuple, 2, &minor) != 0)
652✔
2736
                                tnt_raise(ClientError, ER_WRONG_DD_VERSION);
×
2737
                        /* Version can be major.minor with no patch. */
2738
                        if (tuple_field_u32(new_tuple, 3, &patch) != 0)
652✔
2739
                                patch = 0;
4✔
2740
                        dd_version_id = version_id(major, minor, patch);
652✔
2741
                } else {
2742
                        assert(old_tuple != NULL);
1✔
2743
                        /*
2744
                         * _schema:delete({'version'}) for
2745
                         * example, for box.internal.bootstrap().
2746
                         */
2747
                        dd_version_id = tarantool_version_id();
1✔
2748
                }
2749
        }
2750
}
71,243✔
2751

2752
/**
2753
 * A record with id of the new instance has been synced to the
2754
 * write ahead log. Update the cluster configuration cache
2755
 * with it.
2756
 */
2757
static void
2758
on_commit_dd_cluster(struct trigger *trigger, void *event)
4,851✔
2759
{
2760
        (void) trigger;
2761
        struct txn_stmt *stmt = txn_last_stmt((struct txn *) event);
4,851✔
2762
        struct tuple *new_tuple = stmt->new_tuple;
4,851✔
2763
        struct tuple *old_tuple = stmt->old_tuple;
4,851✔
2764

2765
        if (new_tuple == NULL) {
4,851✔
2766
                struct tt_uuid old_uuid;
2767
                tuple_field_uuid_xc(stmt->old_tuple, BOX_CLUSTER_FIELD_UUID,
1,346✔
2768
                                    &old_uuid);
1,346✔
2769
                struct replica *replica = replica_by_uuid(&old_uuid);
1,346✔
2770
                assert(replica != NULL);
1,346✔
2771
                replica_clear_id(replica);
1,346✔
2772
                return;
1,346✔
2773
        } else if (old_tuple != NULL) {
3,505✔
2774
                return; /* nothing to change */
2✔
2775
        }
2776

2777
        uint32_t id = tuple_field_u32_xc(new_tuple, BOX_CLUSTER_FIELD_ID);
3,503✔
2778
        tt_uuid uuid;
2779
        tuple_field_uuid_xc(new_tuple, BOX_CLUSTER_FIELD_UUID, &uuid);
3,503✔
2780
        struct replica *replica = replica_by_uuid(&uuid);
3,503✔
2781
        if (replica != NULL) {
3,503✔
2782
                replica_set_id(replica, id);
263✔
2783
        } else {
2784
                try {
2785
                        replica = replicaset_add(id, &uuid);
3,240✔
2786
                        /* Can't throw exceptions from on_commit trigger */
2787
                } catch(Exception *e) {
×
2788
                        panic("Can't register replica: %s", e->errmsg);
×
2789
                }
2790
        }
2791
}
2792

2793
/**
2794
 * A trigger invoked on replace in the space _cluster,
2795
 * which contains cluster configuration.
2796
 *
2797
 * This space is modified by JOIN command in IPROTO
2798
 * protocol.
2799
 *
2800
 * The trigger updates the cluster configuration cache
2801
 * with uuid of the newly joined instance.
2802
 *
2803
 * During recovery, it acts the same way, loading identifiers
2804
 * of all instances into the cache. Instance globally unique
2805
 * identifiers are used to keep track of cluster configuration,
2806
 * so that a replica that previously joined a replica set can
2807
 * follow updates, and a replica that belongs to a different
2808
 * replica set can not by mistake join/follow another replica
2809
 * set without first being reset (emptied).
2810
 */
2811
static void
2812
on_replace_dd_cluster(struct trigger *trigger, void *event)
4,930✔
2813
{
2814
        (void) trigger;
2815
        struct txn *txn = (struct txn *) event;
4,930✔
2816
        txn_check_singlestatement_xc(txn, "Space _cluster");
4,930✔
2817
        struct txn_stmt *stmt = txn_current_stmt(txn);
4,929✔
2818
        struct tuple *old_tuple = stmt->old_tuple;
4,929✔
2819
        struct tuple *new_tuple = stmt->new_tuple;
4,929✔
2820
        if (new_tuple != NULL) { /* Insert or replace */
4,929✔
2821
                /* Check fields */
2822
                uint32_t replica_id =
2823
                        tuple_field_u32_xc(new_tuple, BOX_CLUSTER_FIELD_ID);
3,511✔
2824
                replica_check_id(replica_id);
3,511✔
2825
                tt_uuid replica_uuid;
2826
                tuple_field_uuid_xc(new_tuple, BOX_CLUSTER_FIELD_UUID,
2827
                                    &replica_uuid);
3,506✔
2828
                if (tt_uuid_is_nil(&replica_uuid))
3,506✔
2829
                        tnt_raise(ClientError, ER_INVALID_UUID,
×
2830
                                  tt_uuid_str(&replica_uuid));
2831
                if (old_tuple != NULL) {
3,506✔
2832
                        /*
2833
                         * Forbid changes of UUID for a registered instance:
2834
                         * it requires an extra effort to keep _cluster
2835
                         * in sync with appliers and relays.
2836
                         */
2837
                        tt_uuid old_uuid;
2838
                        tuple_field_uuid_xc(old_tuple, BOX_CLUSTER_FIELD_UUID,
2839
                                            &old_uuid);
3✔
2840
                        if (!tt_uuid_is_equal(&replica_uuid, &old_uuid)) {
3✔
2841
                                tnt_raise(ClientError, ER_UNSUPPORTED,
1✔
2842
                                          "Space _cluster",
2843
                                          "updates of instance uuid");
2844
                        }
2845
                }
2846
        } else {
2847
                /*
2848
                 * Don't allow deletion of the record for this instance
2849
                 * from _cluster.
2850
                 */
2851
                assert(old_tuple != NULL);
1,418✔
2852
                uint32_t replica_id =
2853
                        tuple_field_u32_xc(old_tuple, BOX_CLUSTER_FIELD_ID);
1,418✔
2854
                replica_check_id(replica_id);
1,418✔
2855
        }
2856

2857
        struct trigger *on_commit =
2858
                        txn_alter_trigger_new(on_commit_dd_cluster, NULL);
4,851✔
2859
        txn_on_commit(txn, on_commit);
4,851✔
2860
}
4,851✔
2861

2862
/* }}} cluster configuration */
2863

2864
/* {{{ sequence */
2865

2866
/** Create a sequence definition from a tuple. */
2867
static struct sequence_def *
2868
sequence_def_new_from_tuple(struct tuple *tuple, uint32_t errcode)
129✔
2869
{
2870
        uint32_t name_len;
2871
        const char *name = tuple_field_str_xc(tuple, BOX_USER_FIELD_NAME,
2872
                                              &name_len);
129✔
2873
        if (name_len > BOX_NAME_MAX) {
129✔
2874
                tnt_raise(ClientError, errcode,
1✔
2875
                          tt_cstr(name, BOX_INVALID_NAME_MAX),
2876
                          "sequence name is too long");
2877
        }
2878
        identifier_check_xc(name, name_len);
128✔
2879
        size_t sz = sequence_def_sizeof(name_len);
113✔
2880
        struct sequence_def *def = (struct sequence_def *) malloc(sz);
113✔
2881
        if (def == NULL)
113✔
2882
                tnt_raise(OutOfMemory, sz, "malloc", "sequence");
×
2883
        auto def_guard = make_scoped_guard([=] { free(def); });
232✔
2884
        memcpy(def->name, name, name_len);
113✔
2885
        def->name[name_len] = '\0';
113✔
2886
        def->id = tuple_field_u32_xc(tuple, BOX_SEQUENCE_FIELD_ID);
113✔
2887
        def->uid = tuple_field_u32_xc(tuple, BOX_SEQUENCE_FIELD_UID);
113✔
2888
        def->step = tuple_field_i64_xc(tuple, BOX_SEQUENCE_FIELD_STEP);
113✔
2889
        def->min = tuple_field_i64_xc(tuple, BOX_SEQUENCE_FIELD_MIN);
113✔
2890
        def->max = tuple_field_i64_xc(tuple, BOX_SEQUENCE_FIELD_MAX);
113✔
2891
        def->start = tuple_field_i64_xc(tuple, BOX_SEQUENCE_FIELD_START);
113✔
2892
        def->cache = tuple_field_i64_xc(tuple, BOX_SEQUENCE_FIELD_CACHE);
113✔
2893
        def->cycle = tuple_field_bool_xc(tuple, BOX_SEQUENCE_FIELD_CYCLE);
113✔
2894
        if (def->step == 0)
113✔
2895
                tnt_raise(ClientError, errcode, def->name,
2✔
2896
                          "step option must be non-zero");
2897
        if (def->min > def->max)
111✔
2898
                tnt_raise(ClientError, errcode, def->name,
2✔
2899
                          "max must be greater than or equal to min");
2900
        if (def->start < def->min || def->start > def->max)
109✔
2901
                tnt_raise(ClientError, errcode, def->name,
2✔
2902
                          "start must be between min and max");
2903
        def_guard.is_active = false;
107✔
2904
        return def;
214✔
2905
}
2906

2907
/** Argument passed to on_commit_dd_sequence() trigger. */
2908
struct alter_sequence {
2909
        /** Trigger invoked on commit in the _sequence space. */
2910
        struct trigger on_commit;
2911
        /** Trigger invoked on rollback in the _sequence space. */
2912
        struct trigger on_rollback;
2913
        /** Old sequence definition or NULL if create. */
2914
        struct sequence_def *old_def;
2915
        /** New sequence defitition or NULL if drop. */
2916
        struct sequence_def *new_def;
2917
};
2918

2919
/**
2920
 * Trigger invoked on commit in the _sequence space.
2921
 */
2922
static void
2923
on_commit_dd_sequence(struct trigger *trigger, void *event)
195✔
2924
{
2925
        struct txn *txn = (struct txn *) event;
195✔
2926
        struct alter_sequence *alter = (struct alter_sequence *) trigger->data;
195✔
2927

2928
        if (alter->new_def != NULL && alter->old_def != NULL) {
195✔
2929
                /* Alter a sequence. */
2930
                sequence_cache_replace(alter->new_def);
11✔
2931
        } else if (alter->new_def == NULL) {
184✔
2932
                /* Drop a sequence. */
2933
                sequence_cache_delete(alter->old_def->id);
89✔
2934
        }
2935

2936
        trigger_run_xc(&on_alter_sequence, txn_last_stmt(txn));
195✔
2937
}
195✔
2938

2939
/**
2940
 * Trigger invoked on rollback in the _sequence space.
2941
 */
2942
static void
2943
on_rollback_dd_sequence(struct trigger *trigger, void * /* event */)
×
2944
{
2945
        struct alter_sequence *alter = (struct alter_sequence *) trigger->data;
×
2946

2947
        if (alter->new_def != NULL && alter->old_def == NULL) {
×
2948
                /* Rollback creation of a sequence. */
2949
                sequence_cache_delete(alter->new_def->id);
×
2950
        }
2951
}
×
2952

2953
/**
2954
 * A trigger invoked on replace in space _sequence.
2955
 * Used to alter a sequence definition.
2956
 */
2957
static void
2958
on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
224✔
2959
{
2960
        struct txn *txn = (struct txn *) event;
224✔
2961
        txn_check_singlestatement_xc(txn, "Space _sequence");
224✔
2962
        struct txn_stmt *stmt = txn_current_stmt(txn);
224✔
2963
        struct tuple *old_tuple = stmt->old_tuple;
224✔
2964
        struct tuple *new_tuple = stmt->new_tuple;
224✔
2965

2966
        struct alter_sequence *alter =
2967
                region_calloc_object_xc(&fiber()->gc, struct alter_sequence);
224✔
2968

2969
        struct sequence_def *new_def = NULL;
224✔
2970
        auto def_guard = make_scoped_guard([=] { free(new_def); });
477✔
2971

2972
        if (old_tuple == NULL && new_tuple != NULL) {                /* INSERT */
224✔
2973
                new_def = sequence_def_new_from_tuple(new_tuple,
2974
                                                      ER_CREATE_SEQUENCE);
114✔
2975
                assert(sequence_by_id(new_def->id) == NULL);
95✔
2976
                sequence_cache_replace(new_def);
95✔
2977
                alter->new_def = new_def;
95✔
2978
        } else if (old_tuple != NULL && new_tuple == NULL) {        /* DELETE */
110✔
2979
                uint32_t id = tuple_field_u32_xc(old_tuple,
2980
                                                 BOX_SEQUENCE_DATA_FIELD_ID);
95✔
2981
                struct sequence *seq = sequence_by_id(id);
95✔
2982
                assert(seq != NULL);
95✔
2983
                access_check_ddl(seq->def->name, seq->def->uid, SC_SEQUENCE,
95✔
2984
                                 PRIV_D, false);
95✔
2985
                if (space_has_data(BOX_SEQUENCE_DATA_ID, 0, id))
94✔
2986
                        tnt_raise(ClientError, ER_DROP_SEQUENCE,
1✔
2987
                                  seq->def->name, "the sequence has data");
2988
                if (space_has_data(BOX_SPACE_SEQUENCE_ID, 1, id))
93✔
2989
                        tnt_raise(ClientError, ER_DROP_SEQUENCE,
3✔
2990
                                  seq->def->name, "the sequence is in use");
2991
                if (schema_find_grants("sequence", seq->def->id))
90✔
2992
                        tnt_raise(ClientError, ER_DROP_SEQUENCE,
1✔
2993
                                  seq->def->name, "the sequence has grants");
2994
                alter->old_def = seq->def;
89✔
2995
        } else {                                                /* UPDATE */
2996
                new_def = sequence_def_new_from_tuple(new_tuple,
2997
                                                      ER_ALTER_SEQUENCE);
15✔
2998
                struct sequence *seq = sequence_by_id(new_def->id);
12✔
2999
                assert(seq != NULL);
12✔
3000
                access_check_ddl(seq->def->name, seq->def->uid, SC_SEQUENCE,
12✔
3001
                                 PRIV_A, false);
12✔
3002
                alter->old_def = seq->def;
11✔
3003
                alter->new_def = new_def;
11✔
3004
        }
3005

3006
        def_guard.is_active = false;
195✔
3007

3008
        trigger_create(&alter->on_commit,
195✔
3009
                       on_commit_dd_sequence, alter, NULL);
195✔
3010
        txn_on_commit(txn, &alter->on_commit);
195✔
3011
        trigger_create(&alter->on_rollback,
195✔
3012
                       on_rollback_dd_sequence, alter, NULL);
195✔
3013
        txn_on_rollback(txn, &alter->on_rollback);
195✔
3014
}
195✔
3015

3016
/**
3017
 * A trigger invoked on replace in space _sequence_data.
3018
 * Used to update a sequence value.
3019
 */
3020
static void
3021
on_replace_dd_sequence_data(struct trigger * /* trigger */, void *event)
198✔
3022
{
3023
        struct txn *txn = (struct txn *) event;
198✔
3024
        struct txn_stmt *stmt = txn_current_stmt(txn);
198✔
3025
        struct tuple *old_tuple = stmt->old_tuple;
198✔
3026
        struct tuple *new_tuple = stmt->new_tuple;
198✔
3027

3028
        uint32_t id = tuple_field_u32_xc(old_tuple ?: new_tuple,
198✔
3029
                                         BOX_SEQUENCE_DATA_FIELD_ID);
198✔
3030
        struct sequence *seq = sequence_cache_find(id);
198✔
3031
        if (seq == NULL)
198✔
3032
                diag_raise();
×
3033
        if (new_tuple != NULL) {                        /* INSERT, UPDATE */
198✔
3034
                int64_t value = tuple_field_i64_xc(new_tuple,
3035
                                BOX_SEQUENCE_DATA_FIELD_VALUE);
125✔
3036
                if (sequence_set(seq, value) != 0)
125✔
3037
                        diag_raise();
×
3038
        } else {                                        /* DELETE */
3039
                sequence_reset(seq);
73✔
3040
        }
3041
}
198✔
3042

3043
/**
3044
 * Run the triggers registered on commit of a change in _space.
3045
 */
3046
static void
3047
on_commit_dd_space_sequence(struct trigger *trigger, void * /* event */)
50✔
3048
{
3049
        struct space *space = (struct space *) trigger->data;
50✔
3050
        trigger_run_xc(&on_alter_space, space);
50✔
3051
}
50✔
3052

3053
/**
3054
 * A trigger invoked on replace in space _space_sequence.
3055
 * Used to update space <-> sequence mapping.
3056
 */
3057
static void
3058
on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event)
59✔
3059
{
3060
        struct txn *txn = (struct txn *) event;
59✔
3061
        txn_check_singlestatement_xc(txn, "Space _space_sequence");
59✔
3062
        struct txn_stmt *stmt = txn_current_stmt(txn);
59✔
3063
        struct tuple *tuple = stmt->new_tuple ? stmt->new_tuple : stmt->old_tuple;
59✔
3064

3065
        uint32_t space_id = tuple_field_u32_xc(tuple,
3066
                                               BOX_SPACE_SEQUENCE_FIELD_ID);
59✔
3067
        uint32_t sequence_id = tuple_field_u32_xc(tuple,
3068
                                BOX_SPACE_SEQUENCE_FIELD_SEQUENCE_ID);
59✔
3069
        bool is_generated = tuple_field_bool_xc(tuple,
3070
                                BOX_SPACE_SEQUENCE_FIELD_IS_GENERATED);
59✔
3071

3072
        struct space *space = space_cache_find_xc(space_id);
59✔
3073
        struct sequence *seq = sequence_cache_find(sequence_id);
59✔
3074

3075
        enum priv_type priv_type = stmt->new_tuple ? PRIV_C : PRIV_D;
59✔
3076
        if (stmt->new_tuple && stmt->old_tuple)
59✔
3077
                priv_type = PRIV_A;
4✔
3078

3079
        /* Check we have the correct access type on the sequence.  * */
3080
        access_check_ddl(seq->def->name, seq->def->uid, SC_SEQUENCE, priv_type,
59✔
3081
                         false);
59✔
3082
        /** Check we have alter access on space. */
3083
        access_check_ddl(space->def->name, space->def->uid, SC_SPACE, PRIV_A,
55✔
3084
                         false);
55✔
3085

3086
        struct trigger *on_commit =
3087
                txn_alter_trigger_new(on_commit_dd_space_sequence, space);
54✔
3088
        txn_on_commit(txn, on_commit);
54✔
3089

3090
        if (stmt->new_tuple != NULL) {                        /* INSERT, UPDATE */
54✔
3091
                struct index *pk = index_find_xc(space, 0);
32✔
3092
                index_def_check_sequence(pk->def, space_name(space));
31✔
3093
                if (seq->is_generated) {
30✔
3094
                        tnt_raise(ClientError, ER_ALTER_SPACE,
2✔
3095
                                  space_name(space),
3096
                                  "can not attach generated sequence");
3097
                }
3098
                seq->is_generated = is_generated;
28✔
3099
                space->sequence = seq;
28✔
3100
        } else {                                        /* DELETE */
3101
                assert(space->sequence == seq);
22✔
3102
                space->sequence = NULL;
22✔
3103
        }
3104
}
50✔
3105

3106
/* }}} sequence */
3107

3108
static void
3109
unlock_after_dd(struct trigger *trigger, void *event)
255,594✔
3110
{
3111
        (void) trigger;
3112
        (void) event;
3113
        latch_unlock(&schema_lock);
255,594✔
3114
        /*
3115
         * There can be a some count of other latch awaiting fibers. All of
3116
         * these fibers should continue their job before current fiber fires
3117
         * next request. It is important especially for replication - if some
3118
         * rows are applied out of order then lsn order will be broken. This
3119
         * can be done with locking latch one more time - it guarantees that
3120
         * all "queued" fibers did their job before current fiber wakes next
3121
         * time. If there is no waiting fibers then locking will be done without
3122
         * any yields.
3123
         */
3124
        latch_lock(&schema_lock);
255,594✔
3125
        latch_unlock(&schema_lock);
255,594✔
3126
}
255,594✔
3127

3128
static void
3129
lock_before_dd(struct trigger *trigger, void *event)
255,599✔
3130
{
3131
        (void) trigger;
3132
        if (fiber() == latch_owner(&schema_lock))
255,599✔
3133
                return;
×
3134
        struct txn *txn = (struct txn *)event;
255,599✔
3135
        /*
3136
         * This trigger is executed before any check and may yield
3137
         * on the latch lock. But a yield in a non-autocommit
3138
         * memtx transaction will roll it back silently, rather
3139
         * than produce an error, which is very confusing.
3140
         * So don't try to lock a latch if there is
3141
         * a multi-statement transaction.
3142
         */
3143
        txn_check_singlestatement_xc(txn, "DDL");
255,599✔
3144
        struct trigger *on_commit =
3145
                txn_alter_trigger_new(unlock_after_dd, NULL);
255,594✔
3146
        struct trigger *on_rollback =
3147
                txn_alter_trigger_new(unlock_after_dd, NULL);
255,594✔
3148
        /*
3149
         * Setting triggers doesn't fail. Lock the latch last
3150
         * to avoid leaking the latch in case of exception.
3151
         */
3152
        txn_on_commit(txn, on_commit);
255,594✔
3153
        txn_on_rollback(txn, on_rollback);
255,594✔
3154
        latch_lock(&schema_lock);
255,594✔
3155
}
3156

3157
struct trigger alter_space_on_replace_space = {
3158
        RLIST_LINK_INITIALIZER, on_replace_dd_space, NULL, NULL
3159
};
3160

3161
struct trigger alter_space_on_replace_index = {
3162
        RLIST_LINK_INITIALIZER, on_replace_dd_index, NULL, NULL
3163
};
3164

3165
struct trigger on_replace_truncate = {
3166
        RLIST_LINK_INITIALIZER, on_replace_dd_truncate, NULL, NULL
3167
};
3168

3169
struct trigger on_replace_schema = {
3170
        RLIST_LINK_INITIALIZER, on_replace_dd_schema, NULL, NULL
3171
};
3172

3173
struct trigger on_replace_user = {
3174
        RLIST_LINK_INITIALIZER, on_replace_dd_user, NULL, NULL
3175
};
3176

3177
struct trigger on_replace_func = {
3178
        RLIST_LINK_INITIALIZER, on_replace_dd_func, NULL, NULL
3179
};
3180

3181
struct trigger on_replace_collation = {
3182
        RLIST_LINK_INITIALIZER, on_replace_dd_collation, NULL, NULL
3183
};
3184

3185
struct trigger on_replace_priv = {
3186
        RLIST_LINK_INITIALIZER, on_replace_dd_priv, NULL, NULL
3187
};
3188

3189
struct trigger on_replace_cluster = {
3190
        RLIST_LINK_INITIALIZER, on_replace_dd_cluster, NULL, NULL
3191
};
3192

3193
struct trigger on_replace_sequence = {
3194
        RLIST_LINK_INITIALIZER, on_replace_dd_sequence, NULL, NULL
3195
};
3196

3197
struct trigger on_replace_sequence_data = {
3198
        RLIST_LINK_INITIALIZER, on_replace_dd_sequence_data, NULL, NULL
3199
};
3200

3201
struct trigger on_replace_space_sequence = {
3202
        RLIST_LINK_INITIALIZER, on_replace_dd_space_sequence, NULL, NULL
3203
};
3204

3205
struct trigger on_stmt_begin_space = {
3206
        RLIST_LINK_INITIALIZER, lock_before_dd, NULL, NULL
3207
};
3208

3209
struct trigger on_stmt_begin_index = {
3210
        RLIST_LINK_INITIALIZER, lock_before_dd, NULL, NULL
3211
};
3212

3213
struct trigger on_stmt_begin_truncate = {
3214
        RLIST_LINK_INITIALIZER, lock_before_dd, NULL, NULL
3215
};
3216

3217
/* vim: set foldmethod=marker */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc