• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 12478858925

24 Dec 2024 07:59AM UTC coverage: 87.359% (-0.03%) from 87.392%
12478858925

push

github

Buristan
ci: add paths filter to perf_micro workflow

The workflow `perf_micro` is heavy and takes about 20 minutes.
Running benchmarks on unrelated changes is useless, so the patch
limits the list of files that will trigger a run.

NO_CHANGELOG=performance
NO_DOC=performance
NO_TEST=performance

(cherry picked from commit 7db102c78)

69657 of 123524 branches covered (56.39%)

102613 of 117461 relevant lines covered (87.36%)

2936086.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.94
/src/box/box.cc
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "box/box.h"
32

33
#include "trivia/config.h"
34

35
#include <sys/utsname.h>
36
#include <spawn.h>
37

38
#include "box/allocator.h"
39
#include "lua/utils.h" /* lua_hash() */
40
#include "fiber_pool.h"
41
#include <say.h>
42
#include <scoped_guard.h>
43
#include "identifier.h"
44
#include "iproto.h"
45
#include "iproto_constants.h"
46
#include "recovery.h"
47
#include "wal.h"
48
#include "relay.h"
49
#include "applier.h"
50
#include <rmean.h>
51
#include "main.h"
52
#include "tuple.h"
53
#include "tuple_format.h"
54
#include "session.h"
55
#include "schema.h"
56
#include "engine.h"
57
#include "memtx_engine.h"
58
#include "memtx_space.h"
59
#include "memcs_engine.h"
60
#include "sysview.h"
61
#include "blackhole.h"
62
#include "service_engine.h"
63
#include "vinyl.h"
64
#include "space.h"
65
#include "index.h"
66
#include "port.h"
67
#include "txn.h"
68
#include "txn_limbo.h"
69
#include "user.h"
70
#include "cfg.h"
71
#include "coio.h"
72
#include "replication.h" /* replica */
73
#include "title.h"
74
#include "xrow.h"
75
#include "xrow_io.h"
76
#include "xstream.h"
77
#include "authentication.h"
78
#include "security.h"
79
#include "path_lock.h"
80
#include "gc.h"
81
#include "sql.h"
82
#include "systemd.h"
83
#include "call.h"
84
#include "crash.h"
85
#include "func.h"
86
#include "sequence.h"
87
#include "sql_stmt_cache.h"
88
#include "msgpack.h"
89
#include "raft.h"
90
#include "watcher.h"
91
#include "audit.h"
92
#include "trivia/util.h"
93
#include "version.h"
94
#include "mp_uuid.h"
95
#include "flightrec.h"
96
#include "wal_ext.h"
97
#include "mp_util.h"
98
#include "small/static.h"
99
#include "memory.h"
100
#include "node_name.h"
101
#include "tt_sort.h"
102
#include "event.h"
103
#include "tweaks.h"
104

105
static char status[64] = "unconfigured";
106

107
/**
108
 * Storage for WAL vclock. Not in-memory but specifically for what is the
109
 * vclock of the end of WAL.
110
 *
111
 * It is encapsulated inside this file to protect it from any illegal changes
112
 * by outside code.
113
 */
114
static struct vclock instance_vclock_storage;
115

116
/** box.stat rmean */
117
struct rmean *rmean_box;
118

119
double on_shutdown_trigger_timeout = 3.0;
120

121
double txn_timeout_default;
122

123
struct rlist box_on_shutdown_trigger_list =
124
        RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);
125

126
struct event *box_on_shutdown_event = NULL;
127

128
const struct vclock *box_vclock = instance_vclock;
129
const struct vclock *instance_vclock = &instance_vclock_storage;
130

131
const char *box_auth_type;
132

133
const char *box_ballot_event_key = "internal.ballot";
134

135
struct tt_uuid bootstrap_leader_uuid;
136

137
bool box_is_force_recovery = false;
138

139
/**
140
 * If backup is in progress, this points to the gc reference
141
 * object that prevents the garbage collector from deleting
142
 * the checkpoint files that are currently being backed up.
143
 *
144
 * If there is no in-progress backup, is set to NULL.
145
 */
146
static struct gc_checkpoint_ref *backup_gc;
147

148
bool box_read_ffi_is_disabled;
149

150
/**
151
 * Counter behind box_read_ffi_is_disabled: FFI is re-enabled
152
 * when it hits zero.
153
 */
154
static int box_read_ffi_disable_count;
155

156
/**
157
 * The instance is in read-write mode: the local checkpoint
158
 * and all write ahead logs are processed. For a replica,
159
 * it also means we've successfully connected to the master
160
 * and began receiving updates from it.
161
 */
162
static bool is_box_configured = false;
163
static bool is_storage_initialized = false;
164
/** Set if storage shutdown is started. */
165
static bool is_storage_shutdown = false;
166
static bool is_ro = true;
167
static fiber_cond ro_cond;
168

169
/**
170
 * The following flag is set if the instance failed to
171
 * synchronize to a sufficient number of replicas to form
172
 * a quorum and so was forced to switch to read-only mode.
173
 */
174
static bool is_orphan;
175

176
/**
177
 * Summary flag incorporating all the instance attributes,
178
 * affecting ability to write. Currently these are:
179
 * - is_ro;
180
 * - is_orphan;
181
 */
182
static bool is_ro_summary = true;
183

184
/**
185
 * The pool of fibers in the transaction processor thread
186
 * working on incoming messages from net, wal and other
187
 * threads.
188
 */
189
static struct fiber_pool tx_fiber_pool;
190
/**
191
 * A separate endpoint for WAL wakeup messages, to
192
 * ensure that WAL messages are delivered even
193
 * if all fibers in tx_fiber_pool are used. Without
194
 * this endpoint, tx thread could deadlock when there
195
 * are too many messages in flight (gh-1892).
196
 */
197
static struct cbus_endpoint tx_prio_endpoint;
198

199
struct event *box_on_recovery_state_event;
200

201
/** Cached event 'tarantool.trigger.on_change'. */
202
static struct event *tarantool_trigger_on_change_event;
203

204
/**
205
 * Recovery states supported by on_recovery_state triggers.
206
 * Are positioned in the order of appearance during initial box.cfg().
207
 * The only exception are WAL_RECOVERED and INDEXES_BUILT, which might come in
208
 * any order, since the moment when secondary indexes are built depends on
209
 * box.cfg.force_recovery and box.cfg.memtx_use_mvcc_engine.
210
 */
211
enum box_recovery_state {
212
        /**
213
         * The node has either recovered the snapshot from the disk, received
214
         * the snapshot from the remote master as part of initial join stage or
215
         * has bootstrapped the cluster.
216
         */
217
        RECOVERY_STATE_SNAPSHOT_RECOVERED,
218
        /**
219
         * The node has either recovered the local WAL files, received the WALs
220
         * from the remote master as part of final join or has bootstrapped the
221
         * cluster.
222
         */
223
        RECOVERY_STATE_WAL_RECOVERED,
224
        /**
225
         * The node has built secondary indexes for memtx spaces.
226
         */
227
        RECOVERY_STATE_INDEXES_BUILT,
228
        /**
229
         * The node has synced with remote peers. IOW, it has transitioned from
230
         * "orphan" to "running".
231
         */
232
        RECOVERY_STATE_SYNCED,
233
        box_recovery_state_MAX,
234
};
235

236
static const char *box_recovery_state_strs[box_recovery_state_MAX] = {
237
        /* [RECOVERY_STATE_SNAPSHOT_RECOVERED] = */
238
        "snapshot_recovered",
239
        /* [RECOVERY_STATE_WAL_RECOVERED] = */
240
        "wal_recovered",
241
        /* [RECOVERY_STATE_INDEXES_BUILT] = */
242
        "indexes_built",
243
        /* [RECOVERY_STATE_SYNCED] = */
244
        "synced",
245
};
246

247
/** Whether the triggers for "synced" recovery stage have already run. */
248
static bool recovery_state_synced_is_reached;
249

250
/** PATH_MAX is too big and 2K is recommended limit for web address. */
251
#define BOX_FEEDBACK_HOST_MAX 2048
252

253
/** Feedback host URL to send crash info to. */
254
static char box_feedback_host[BOX_FEEDBACK_HOST_MAX];
255

256
/** Whether sending crash info to feedback URL is enabled. */
257
static bool box_feedback_crash_enabled;
258

259
#ifdef TEST_BUILD
260
/**
261
 * Set timeout to infinity in test build because first not all CI tests treat
262
 * non zero exit code of Tarantool instance as failure currently. Also in
263
 * luatest currently it is easier to test for no hanging rather then for
264
 * Tarantool instance exit code.
265
 */
266
#define BOX_SHUTDOWN_TIMEOUT_DEFAULT TIMEOUT_INFINITY
267
#else
268
#define BOX_SHUTDOWN_TIMEOUT_DEFAULT 3.0
269
#endif
270

271
/** Timeout on waiting client related fibers to finish. */
272
static double box_shutdown_timeout = BOX_SHUTDOWN_TIMEOUT_DEFAULT;
273
TWEAK_DOUBLE(box_shutdown_timeout);
8,265✔
274

275
/** Idle timeout for box fiber pool. */
276
static double box_fiber_pool_idle_timeout = FIBER_POOL_IDLE_TIMEOUT;
277
TWEAK_DOUBLE(box_fiber_pool_idle_timeout);
8,265✔
278

279
static int
280
box_run_on_recovery_state(enum box_recovery_state state)
17,534✔
281
{
282
        assert(state >= 0 && state < box_recovery_state_MAX);
17,534!
283
        const char *state_str = box_recovery_state_strs[state];
17,534✔
284
        struct port args;
285
        port_c_create(&args);
17,534!
286
        port_c_add_str0(&args, state_str);
17,534!
287
        int rc = event_run_triggers(box_on_recovery_state_event, &args);
17,534!
288
        port_destroy(&args);
17,534!
289
        return rc;
17,534✔
290
}
291

292
static void
293
box_storage_init(void);
294

295
/**
296
 * A timer to broadcast the updated vclock. Doing this on each vclock update
297
 * would be too expensive.
298
 */
299
static ev_timer box_broadcast_ballot_timer;
300

301
/** Set a new interval for vclock updates in ballot. */
302
static void
303
box_update_broadcast_ballot_interval(double interval)
4,910✔
304
{
305
        static double ballot_broadcast_interval;
306
        /* Do the broadcast at least once a second. */
307
        interval = MIN(interval, 1.0);
4,910✔
308
        if (interval == ballot_broadcast_interval)
4,910✔
309
                return;
394✔
310
        double timeout = ev_timer_remaining(loop(),
4,516!
311
                                            &box_broadcast_ballot_timer);
312
        timeout -= ballot_broadcast_interval;
4,516✔
313
        timeout += interval;
4,516✔
314
        ev_timer_stop(loop(), &box_broadcast_ballot_timer);
4,516!
315
        ev_timer_set(&box_broadcast_ballot_timer, timeout, interval);
4,516✔
316
        ev_timer_start(loop(), &box_broadcast_ballot_timer);
4,516!
317
        ballot_broadcast_interval = interval;
4,516✔
318
}
319

320
/** A callback to broadcast updated vclock in ballot by timeout. */
321
static void
322
box_broadcast_ballot_on_timeout(ev_loop *loop, ev_timer *timer, int events)
98,457✔
323
{
324
        (void)loop;
325
        (void)timer;
326
        (void)events;
327
        static struct vclock broadcast_vclock;
328
        if (vclock_compare_ignore0(&broadcast_vclock, instance_vclock) == 0)
98,457✔
329
                return;
90,921✔
330
        box_broadcast_ballot();
7,536✔
331
        vclock_copy(&broadcast_vclock, instance_vclock);
7,536✔
332
}
333

334
/**
335
 * Generate and update the instance status title
336
 */
337
static void
338
title(const char *new_status)
18,336✔
339
{
340
        snprintf(status, sizeof(status), "%s", new_status);
18,336✔
341
        title_set_status(new_status);
18,336✔
342
        title_update();
18,336✔
343
        systemd_snotify("STATUS=%s", status);
18,336✔
344
        box_broadcast_status();
18,336✔
345
}
18,336✔
346

347
void
348
box_update_ro_summary(void)
29,645✔
349
{
350
        bool old_is_ro_summary = is_ro_summary;
29,645✔
351
        is_ro_summary = is_ro || is_orphan || raft_is_ro(box_raft()) ||
43,457✔
352
                        txn_limbo_is_ro(&txn_limbo);
13,812✔
353
        /* In 99% nothing changes. Filter this out first. */
354
        if (is_ro_summary == old_is_ro_summary)
29,645✔
355
                return;
20,588✔
356

357
        if (is_ro_summary)
9,057✔
358
                engine_switch_to_ro();
2,583✔
359
        fiber_cond_broadcast(&ro_cond);
9,057✔
360
        box_broadcast_status();
9,057✔
361
        box_broadcast_election();
9,057✔
362
        box_broadcast_ballot();
9,057✔
363
}
364

365
API_EXPORT const char *
366
box_ro_reason(void)
293✔
367
{
368
        if (raft_is_ro(box_raft()))
293✔
369
                return "election";
17✔
370
        if (txn_limbo_is_ro(&txn_limbo))
276✔
371
                return "synchro";
9✔
372
        if (is_ro)
267✔
373
                return "config";
69✔
374
        if (is_orphan)
198✔
375
                return "orphan";
12✔
376
        return NULL;
186✔
377
}
378

379
int
380
box_check_slice_slow(void)
19,220✔
381
{
382
        return fiber_check_slice();
19,220✔
383
}
384

385
int
386
box_check_writable(void)
193,523✔
387
{
388
        if (!is_ro_summary)
193,523✔
389
                return 0;
193,463✔
390
        struct error *e = diag_set(ClientError, ER_READONLY);
60!
391
        struct raft *raft = box_raft();
60✔
392
        error_append_msg(e, " - ");
60✔
393
        error_set_str(e, "reason", box_ro_reason());
60✔
394
        /*
395
         * In case of multiple reasons at the same time only one is reported.
396
         * But the order is important. For example, if the instance has election
397
         * enabled, for the client it is better to see that it is a 'follower'
398
         * and who is the leader than just see cfg 'read_only' is true.
399
         */
400
        if (raft_is_ro(raft)) {
60✔
401
                const char *state = raft_state_str(raft->state);
10✔
402
                uint64_t term = raft->volatile_term;
10✔
403
                error_set_str(e, "state", state);
10✔
404
                error_set_uint(e, "term", term);
10✔
405
                error_append_msg(e, "state is election %s with term %llu",
10✔
406
                                 state, (unsigned long long)term);
407
                uint32_t id = raft->leader;
10✔
408
                if (id != REPLICA_ID_NIL) {
10✔
409
                        error_set_uint(e, "leader_id", id);
4✔
410
                        error_append_msg(e, ", leader is %u", id);
4✔
411
                        struct replica *r = replica_by_id(id);
4✔
412
                        /*
413
                         * XXX: when the leader is dropped from _cluster, it
414
                         * is not reported to Raft.
415
                         */
416
                        if (r != NULL) {
4✔
417
                                error_set_uuid(e, "leader_uuid", &r->uuid);
3✔
418
                                error_append_msg(e, " (%s)",
3✔
419
                                                 tt_uuid_str(&r->uuid));
3✔
420
                        }
421
                }
422
        } else if (txn_limbo_is_ro(&txn_limbo)) {
50✔
423
                uint32_t id = txn_limbo.owner_id;
6✔
424
                uint64_t term = txn_limbo.promote_greatest_term;
6✔
425
                error_set_uint(e, "queue_owner_id", id);
6✔
426
                error_set_uint(e, "term", term);
6✔
427
                error_append_msg(e, "synchro queue with term %llu belongs "
6✔
428
                                 "to %u", (unsigned long long)term,
429
                                 (unsigned)id);
430
                struct replica *r = replica_by_id(id);
6✔
431
                /*
432
                 * XXX: when an instance is deleted from _cluster, its limbo's
433
                 * ownership is not cleared.
434
                 */
435
                if (r != NULL) {
6✔
436
                        error_set_uuid(e, "queue_owner_uuid", &r->uuid);
5✔
437
                        error_append_msg(e, " (%s)", tt_uuid_str(&r->uuid));
5✔
438
                }
439
                if (txn_limbo_is_owned_by_current_instance(&txn_limbo)) {
6✔
440
                        if (txn_limbo.is_frozen_due_to_fencing) {
2!
441
                                error_append_msg(e, " and is frozen due to "
2✔
442
                                                    "fencing");
443
                        } else if (txn_limbo.is_frozen_until_promotion) {
×
444
                                error_append_msg(e, " and is frozen until "
×
445
                                                    "promotion");
446
                        }
447
                }
448
        } else {
449
                if (is_ro)
44✔
450
                        error_append_msg(e, "box.cfg.read_only is true");
33✔
451
                else if (is_orphan)
11!
452
                        error_append_msg(e, "it is an orphan");
11✔
453
                else
454
                        assert(false);
×
455
        }
456
        diag_log();
60✔
457
        return -1;
60✔
458
}
459

460
static void
461
box_check_writable_xc(void)
1,545✔
462
{
463
        if (box_check_writable() != 0)
1,545✔
464
                diag_raise();
9✔
465
}
1,536✔
466

467
static void
468
box_check_memtx_min_tuple_size(ssize_t memtx_min_tuple_size)
4,503✔
469
{
470

471
        if (memtx_min_tuple_size < 8 || memtx_min_tuple_size > 1048280)
4,503✔
472
        tnt_raise(ClientError, ER_CFG, "memtx_min_tuple_size",
5!
473
                  "specified value is out of bounds");
474
}
4,498✔
475

476
int
477
box_process_rw(struct request *request, struct space *space,
13,459,100✔
478
               struct tuple **result)
479
{
480
        struct tuple *tuple = NULL;
13,459,100✔
481
        bool return_tuple = false;
13,459,100✔
482
        struct txn *txn = in_txn();
13,459,100!
483
        bool is_autocommit = txn == NULL;
13,459,100✔
484
        if (is_autocommit && (txn = txn_begin()) == NULL)
13,459,100!
485
                return -1;
×
486
        assert(iproto_type_is_dml(request->type));
13,459,100!
487
        rmean_collect(rmean_box, request->type, 1);
13,459,100!
488
        if (access_check_space(space, PRIV_W) != 0)
13,459,100!
489
                goto rollback;
30✔
490
        if (txn_begin_stmt(txn, space, request->type) != 0)
13,459,100!
491
                goto rollback;
15,388✔
492
        if (space_execute_dml(space, txn, request, &tuple) != 0) {
13,443,700!
493
                txn_rollback_stmt(txn);
61,046!
494
                goto rollback;
61,046✔
495
        }
496
        if (result != NULL)
13,382,700✔
497
                *result = tuple;
11,672,800✔
498

499
        return_tuple = result != NULL && tuple != NULL;
13,382,700✔
500
        if (return_tuple) {
13,382,700✔
501
                /*
502
                 * Pin the tuple locally before the commit,
503
                 * otherwise it may go away during yield in
504
                 * when WAL is written in autocommit mode.
505
                 */
506
                tuple_ref(tuple);
11,228,700!
507
        }
508

509
        if (txn_commit_stmt(txn, request))
13,382,700!
510
                goto rollback;
1,600✔
511

512
        if (is_autocommit && txn_commit(txn) < 0)
13,381,100!
513
                goto error;
1,208✔
514
        if (return_tuple) {
13,379,900✔
515
                tuple_bless(tuple);
11,226,000!
516
                tuple_unref(tuple);
11,226,000!
517
        }
518
        return 0;
13,379,900✔
519

520
rollback:
78,064✔
521
        if (is_autocommit)
78,064✔
522
                txn_abort(txn);
7,103!
523
error:
70,961✔
524
        if (return_tuple)
79,272✔
525
                tuple_unref(tuple);
2,646!
526
        return -1;
79,272✔
527
}
528

529
static bool
530
box_check_ro(void);
531

532
void
533
box_set_ro(void)
4,811✔
534
{
535
        is_ro = box_check_ro();
4,811✔
536
        box_update_ro_summary();
4,810✔
537
}
4,810✔
538

539
API_EXPORT bool
540
box_is_ro(void)
52,406✔
541
{
542
        return is_ro_summary;
52,406✔
543
}
544

545
bool
546
box_is_orphan(void)
3,628✔
547
{
548
        return is_orphan;
3,628✔
549
}
550

551
bool
552
box_is_anon(void)
15,364✔
553
{
554
        return instance_id == REPLICA_ID_NIL;
15,364✔
555
}
556

557
API_EXPORT int
558
box_wait_ro(bool ro, double timeout)
374✔
559
{
560
        double deadline = ev_monotonic_now(loop()) + timeout;
374!
561
        while (is_box_configured == false || box_is_ro() != ro) {
436✔
562
                if (fiber_cond_wait_deadline(&ro_cond, deadline) != 0)
258✔
563
                        return -1;
196✔
564
        }
565
        return 0;
178✔
566
}
567

568
void
569
box_do_set_orphan(bool orphan)
11,742✔
570
{
571
        is_orphan = orphan;
11,742✔
572
        box_update_ro_summary();
11,742✔
573
        if (!is_orphan && !recovery_state_synced_is_reached) {
11,742✔
574
                box_run_on_recovery_state(RECOVERY_STATE_SYNCED);
1,495✔
575
                recovery_state_synced_is_reached = true;
1,495✔
576
        }
577
}
11,742✔
578

579
void
580
box_set_orphan(bool orphan)
9,485✔
581
{
582
        box_do_set_orphan(orphan);
9,485✔
583
        /* Update the title to reflect the new status. */
584
        if (is_orphan) {
9,485✔
585
                say_info("entering orphan mode");
207!
586
                title("orphan");
207✔
587
        } else {
588
                say_info("leaving orphan mode");
9,278✔
589
                title("running");
9,278✔
590
        }
591
}
9,485✔
592

593
struct wal_stream {
594
        /** Base class. */
595
        struct xstream base;
596
        /** The lsregion for allocating rows. */
597
        struct lsregion lsr;
598
        /** The array of lists keeping rows from xlog. */
599
        struct rlist nodes_rows[VCLOCK_MAX];
600
        /** Current transaction ID. 0 when no transaction. */
601
        int64_t tsn;
602
        /**
603
         * LSN of the first row saved to check TSN and LSN match in case all
604
         * rows of the tx appeared to be local.
605
         */
606
        int64_t first_row_lsn;
607
        /**
608
         * Flag whether there is a pending yield to do when the current
609
         * transaction is finished. It can't always be done right away because
610
         * would abort the current transaction if it is memtx.
611
         */
612
        bool has_yield;
613
        /**
614
         * True if any row in the transaction was global. Saved to check if TSN
615
         * matches LSN of a first global row.
616
         */
617
        bool has_global_row;
618
};
619

620
/**
621
 * A stub used in txn_commit() during local recovery. We "replay"
622
 * transactions during local recovery, with WAL turned off.
623
 * Since each transaction attempts to write itself to WAL at
624
 * commit, we need an implementation which would fake WAL write.
625
 */
626
struct recovery_journal {
627
        struct journal base;
628
        struct vclock *vclock;
629
};
630

631
/**
632
 * Use the current row LSN as commit LSN - vinyl needs to see the
633
 * exact same signature during local recovery to properly mark
634
 * min/max LSN of created LSM levels.
635
 */
636
static int
637
recovery_journal_write(struct journal *base,
61,282✔
638
                       struct journal_entry *entry)
639
{
640
        struct recovery_journal *journal = (struct recovery_journal *) base;
61,282✔
641
        for (int i = 0; i < entry->n_rows; ++i)
241,790✔
642
                vclock_follow_xrow(journal->vclock, entry->rows[i]);
180,508✔
643
        entry->res = vclock_sum(journal->vclock);
61,282✔
644
        /*
645
         * Since there're no actual writes, fire a
646
         * journal_async_complete callback right away.
647
         */
648
        journal_async_complete(entry);
61,282✔
649
        return 0;
61,282✔
650
}
651

652
static void
653
recovery_journal_create(struct vclock *v)
1,550✔
654
{
655
        static struct recovery_journal journal;
656
        journal_create(&journal.base, recovery_journal_write);
1,550✔
657
        journal.vclock = v;
1,550✔
658
        journal_set(&journal.base);
1,550✔
659
}
1,550✔
660

661
/**
662
 * Drop the stream to the initial state. It is supposed to be done when an error
663
 * happens. Because in case of force recovery the stream will continue getting
664
 * tuples. For that it must stay in a valid state and must handle them somehow.
665
 *
666
 * Now the stream simply drops the current transaction like it never happened,
667
 * even if its commit-row wasn't met yet. Should be good enough for
668
 * force-recovery when the consistency is already out of the game.
669
 */
670
static void
671
wal_stream_abort(struct wal_stream *stream)
38✔
672
{
673
        struct txn *tx = in_txn();
38✔
674
        if (tx != NULL)
38✔
675
                txn_abort(tx);
7✔
676
        stream->tsn = 0;
38✔
677
}
38✔
678

679
/**
680
 * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
681
 * in sync with the fiber's txn being non-NULL. It has nothing to do with the
682
 * journal content, and therefore can use assertions instead of rigorous error
683
 * checking even in release.
684
 */
685
static bool
686
wal_stream_has_tx_in_progress(const struct wal_stream *stream)
360,793✔
687
{
688
        bool has = stream->tsn != 0;
360,793✔
689
        assert(has == (in_txn() != NULL));
360,793!
690
        return has;
360,793✔
691
}
692

693
/**
694
 * The function checks that the tail of the transaction from the journal
695
 * has been read. A mixed transaction consists of multiple transactions
696
 * that are mixed together. Need to make sure that after the restoration
697
 * is completed, there are no unfinished transactions left in the
698
 * nodes_rows array. wal_stream_has_tx_in_progress is a transaction which
699
 * has all the data, but simply isn't committed yet.
700
 */
701
static bool
702
wal_stream_has_unfinished_tx(const struct wal_stream *stream)
818✔
703
{
704
        if (wal_stream_has_tx_in_progress(stream))
818!
705
                return true;
×
706
        const struct rlist *nodes_rows = stream->nodes_rows;
818✔
707
        for (int i = 0; i < VCLOCK_MAX; i++) {
26,964✔
708
                if (!rlist_empty((struct rlist *)&nodes_rows[i]))
52,294✔
709
                        return true;
1✔
710
        }
711
        return false;
817✔
712
}
713

714
static int
715
wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
426✔
716
{
717
        assert(iproto_type_is_synchro_request(row->type));
426!
718
        if (wal_stream_has_tx_in_progress(stream)) {
426!
719
                diag_set(XlogError, "found synchro request in a transaction");
×
720
                return -1;
×
721
        }
722
        struct synchro_request syn_req;
723
        if (xrow_decode_synchro(row, &syn_req, NULL) != 0) {
426!
724
                say_error("couldn't decode a synchro request");
×
725
                return -1;
×
726
        }
727
        if (journal_write_row(row) != 0)
426!
728
                return -1;
×
729
        return txn_limbo_process(&txn_limbo, &syn_req);
426!
730
}
731

732
static int
733
wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
134✔
734
{
735
        assert(iproto_type_is_raft_request(row->type));
134!
736
        if (wal_stream_has_tx_in_progress(stream)) {
134!
737
                diag_set(XlogError, "found raft request in a transaction");
×
738
                return -1;
×
739
        }
740
        struct raft_request raft_req;
741
        /* Vclock is never persisted in WAL by Raft. */
742
        if (xrow_decode_raft(row, &raft_req, NULL) != 0) {
134!
743
                say_error("couldn't decode a raft request");
×
744
                return -1;
×
745
        }
746
        if (journal_write_row(row) != 0)
134!
747
                return -1;
×
748
        box_raft_recover(&raft_req);
134!
749
        return 0;
134✔
750
}
751

752
/**
753
 * Rows of the same transaction are wrapped into begin/commit. Mostly for the
754
 * sake of synchronous replication, when the log can contain rolled back
755
 * transactions, which must be entirely reverted during recovery when ROLLBACK
756
 * records are met. Row-by-row recovery wouldn't work for multi-statement
757
 * synchronous transactions.
758
 */
759
static int
760
wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
179,112✔
761
{
762
        struct request request;
763
        uint64_t req_type = dml_request_key_map(row->type);
179,112✔
764
        if (xrow_decode_dml(row, &request, req_type) != 0) {
179,112!
765
                say_error("couldn't decode a DML request");
×
766
                return -1;
×
767
        }
768
        /*
769
         * Note that all the information which came from the log is validated
770
         * and the errors are handled. Not asserted or paniced. That is for the
771
         * sake of force recovery, which must be able to recover just everything
772
         * what possible instead of terminating the instance.
773
         */
774
        struct txn *txn;
775
        if (stream->tsn == 0) {
179,112✔
776
                if (row->tsn == 0) {
59,887!
777
                        diag_set(XlogError, "found a row without TSN");
×
778
                        goto end_diag_request;
×
779
                }
780
                stream->tsn = row->tsn;
59,887✔
781
                stream->first_row_lsn = row->lsn;
59,887✔
782
                stream->has_global_row = false;
59,887✔
783
                /*
784
                 * Rows are not stacked into a list like during replication,
785
                 * because recovery does not yield while reading the rows. All
786
                 * the yields are controlled by the stream, and therefore no
787
                 * need to wait for all the rows to start a transaction. Can
788
                 * start now, apply the rows, and make a yield after commit if
789
                 * necessary. Helps to avoid a lot of copying.
790
                 */
791
                txn = txn_begin();
59,887!
792
                if (txn == NULL) {
59,887!
793
                        say_error("couldn't begin a recovery transaction");
×
794
                        return -1;
×
795
                }
796
        } else if (row->tsn != stream->tsn) {
119,225✔
797
                diag_set(XlogError, "found a next transaction with the "
1!
798
                         "previous one not yet committed");
799
                goto end_diag_request;
1✔
800
        } else {
801
                txn = in_txn();
119,224!
802
        }
803
        /* Ensure TSN is equal to LSN of the first global row. */
804
        if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
179,111✔
805
                if (row->tsn != row->lsn) {
59,216✔
806
                        diag_set(XlogError, "found a first global row in a "
1!
807
                                 "transaction with LSN/TSN mismatch");
808
                        goto end_diag_request;
1✔
809
                }
810
                stream->has_global_row = true;
59,215✔
811
        }
812
        assert(wal_stream_has_tx_in_progress(stream));
179,110!
813
        /* Nops might appear at least after before_replace skipping rows. */
814
        if (request.type != IPROTO_NOP) {
179,110✔
815
                struct space *space = space_cache_find(request.space_id);
178,977!
816
                if (space == NULL) {
178,977!
817
                        say_error("couldn't find space by ID");
×
818
                        goto end_diag_request;
×
819
                }
820
                if (box_process_rw(&request, space, NULL) != 0) {
178,977!
821
                        say_error("couldn't apply the request");
5!
822
                        goto end_diag_request;
5✔
823
                }
824
        } else {
825
                if (txn_begin_stmt(txn, NULL, request.type) != 0)
133!
826
                        goto end_diag_request;
×
827
                if (txn_commit_stmt(txn, &request))
133!
828
                        goto end_diag_request;
×
829

830
        }
831
        assert(txn != NULL);
179,105!
832
        if (!row->is_commit)
179,105✔
833
                return 0;
119,225✔
834
        if (row->wait_ack)
59,880✔
835
                box_txn_make_sync();
1,423✔
836
        /*
837
         * For fully local transactions the TSN check won't work like for global
838
         * transactions, because it is not known if there are global rows until
839
         * commit arrives.
840
         */
841
        if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
59,880!
842
                diag_set(XlogError, "fully local transaction's TSN does not "
×
843
                         "match LSN of the first row");
844
                return -1;
×
845
        }
846
        stream->tsn = 0;
59,880✔
847
        /*
848
         * During local recovery the commit procedure should be async, otherwise
849
         * the only fiber processing recovery will get stuck on the first
850
         * synchronous tx it meets until confirm timeout is reached and the tx
851
         * is rolled back, yielding an error.
852
         * Moreover, txn_commit_submit() doesn't hurt at all during local
853
         * recovery, since journal_write is faked at this stage and returns
854
         * immediately.
855
         */
856
        if (txn_commit_submit(txn) != 0) {
59,880!
857
                /* Commit fail automatically leads to rollback. */
858
                assert(in_txn() == NULL);
×
859
                say_error("couldn't commit a recovery transaction");
×
860
                return -1;
×
861
        }
862
        assert(in_txn() == NULL);
59,880!
863
        return 0;
59,880✔
864

865
end_diag_request:
7✔
866
        /*
867
         * The label must be used only for the errors related directly to the
868
         * request. Errors like txn_begin() fail has nothing to do with it, and
869
         * therefore don't log the request as the fault reason.
870
         */
871
        say_error("error at request: %s", request_str(&request));
7!
872
        return -1;
7✔
873
}
874

875
/**
876
 * Keeping added rows in a rlist to separate mixed transactions
877
 */
878
struct wal_row {
879
        /** Base class. */
880
        struct xrow_header row;
881
        /** A link on the list of rows stacked in the nodes_rows array. */
882
        struct rlist in_row_list;
883
        /** A growing identifier to track lsregion allocations. */
884
        int64_t lsr_id;
885
};
886

887
/**
888
 * Callback to stash row and row bodies upon receipt, used to recover
889
 * mixed transactions.
890
 */
891
static struct wal_row *
892
wal_stream_save_row(struct wal_stream *stream,
762✔
893
                    const struct xrow_header *row)
894
{
895
        static int64_t lsr_id = 0;
896
        struct lsregion *lsr = &stream->lsr;
762✔
897
        struct wal_row *new_row = xlsregion_alloc_object(lsr, ++lsr_id,
762!
898
                                                         typeof(*new_row));
899
        new_row->lsr_id = lsr_id;
762✔
900
        new_row->row = *row;
762✔
901

902
        assert(new_row->row.bodycnt <= 1);
762!
903
        if (new_row->row.bodycnt == 1) {
762!
904
                size_t len = new_row->row.body[0].iov_len;
762✔
905
                char *new_base = (char *)xlsregion_alloc(lsr, len, lsr_id);
762!
906
                memcpy(new_base, new_row->row.body[0].iov_base, len);
762✔
907
                /* Adjust row body pointers. */
908
                new_row->row.body[0].iov_base = new_base;
762✔
909
        }
910
        return new_row;
762✔
911
}
912

913
/**
914
 * Find the min lsr_id that is still needed.
915
 */
916
static int64_t
917
wal_stream_find_min_lsr_id(struct wal_stream *stream)
699✔
918
{
919
        int64_t min_lsr_id = INT64_MAX;
699✔
920
        struct rlist *nodes_rows = stream->nodes_rows;
699✔
921
        struct wal_row *item;
922
        /*
923
         * For each new row, lsr_id is incremented by 1. Only row from
924
         * different replica_id can get mixed, so for each replica_id,
925
         * the very first row has the smallest lsr_id of all row with that
926
         * replica_id. Thus, for each transaction read to the end, to
927
         * iterate through the nodes_rows array of lists and see which of
928
         * the first `row` for each `replica_id` has the lowest lsr_id.
929
         * This lsr_id is still needed, but anything less than this is not.
930
         * So can free everything up to this lsr_id.
931
         */
932
        for (uint32_t i = 0; i < VCLOCK_MAX; i++) {
23,067✔
933
                if (!rlist_empty(&nodes_rows[i])) {
44,736✔
934
                        item = rlist_first_entry(&nodes_rows[i], wal_row,
3✔
935
                                                 in_row_list);
936
                        if (item->lsr_id <= min_lsr_id)
3!
937
                                min_lsr_id = item->lsr_id - 1;
3✔
938
                }
939
        }
940
        return min_lsr_id;
699✔
941
}
942

943
/**
944
 * Deallocating memory for the wal_row
945
 */
946
static void
947
wal_stream_gc(struct wal_stream *stream)
699✔
948
{
949
        struct lsregion *lsr = &stream->lsr;
699✔
950
        int64_t lsr_id = wal_stream_find_min_lsr_id(stream);
699✔
951
        lsregion_gc(lsr, lsr_id);
699✔
952
}
699✔
953

954
/**
955
 * When restoring the log is read row-by-row. However, it is necessary
956
 * to store the rows for correct further recovery. For example, with
957
 * mixed transactions. The function saves the again coming string to the
958
 * rlist. This rlist is stored in nodes_rows array in replica_id cell.
959
 * As soon as a row arrives with the is_commit flag set, the corresponding
960
 * transaction tries to apply. If an error occurs the transaction is
961
 * rolled back. And we continue to work with the next transaction.
962
 */
963
static int
964
wal_stream_apply_mixed_dml_row(struct wal_stream *stream,
762✔
965
                               struct xrow_header *row)
966
{
967
        /*
968
         * A local row can be part of any node's transaction. Try to find the
969
         * right transaction by tsn. If this fails, assume the row belongs to
970
         * this node.
971
         */
972
        uint32_t id;
973
        struct rlist *nodes_rows = stream->nodes_rows;
762✔
974
        if (row->replica_id == 0) {
762✔
975
                id = instance_id;
176✔
976
                for (uint32_t i = 0; i < VCLOCK_MAX; i++) {
4,323✔
977
                        if (rlist_empty(&nodes_rows[i]))
8,390✔
978
                                continue;
4,147✔
979
                        struct wal_row *tmp =
980
                                rlist_last_entry(&nodes_rows[i], typeof(*tmp),
48✔
981
                                                 in_row_list);
982
                        if (tmp->row.tsn == row->tsn) {
48!
983
                                id = i;
48✔
984
                                break;
48✔
985
                        }
986
                }
987
        } else {
988
                id = row->replica_id;
586✔
989
        }
990

991
        struct wal_row *save_row = wal_stream_save_row(stream, row);
762✔
992
        rlist_add_tail_entry(&nodes_rows[id], save_row, in_row_list);
762✔
993

994
        if (!row->is_commit)
762✔
995
                return 0;
63✔
996

997
        int rc = 0;
699✔
998
        struct wal_row *item, *next_row;
999
        rlist_foreach_entry_safe(item, &nodes_rows[id],
2,918!
1000
                                 in_row_list, next_row) {
1001
                rlist_del_entry(item, in_row_list);
760✔
1002
                if (rc != 0)
760✔
1003
                        continue;
1✔
1004
                rc = wal_stream_apply_dml_row(stream, &item->row);
759✔
1005
        }
1006
        /* deallocating the memory */
1007
        wal_stream_gc(stream);
699✔
1008
        assert(rlist_empty(&nodes_rows[id]));
1,398!
1009

1010
        return rc;
699✔
1011
}
1012

1013
/**
1014
 * Yield once in a while, but not too often, mostly to allow signal handling to
1015
 * take place.
1016
 */
1017
static void
1018
wal_stream_try_yield(struct wal_stream *stream)
180,305✔
1019
{
1020
        if (wal_stream_has_tx_in_progress(stream) || !stream->has_yield)
180,305✔
1021
                return;
179,675✔
1022
        stream->has_yield = false;
630✔
1023
        fiber_sleep(0);
630✔
1024
}
1025

1026
static void
1027
wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
179,675✔
1028
{
1029
        struct wal_stream *stream =
1030
                container_of(base, struct wal_stream, base);
179,675✔
1031
        if (iproto_type_is_synchro_request(row->type)) {
179,675✔
1032
                if (wal_stream_apply_synchro_row(stream, row) != 0)
426!
1033
                        goto end_error;
×
1034
        } else if (iproto_type_is_raft_request(row->type)) {
179,249✔
1035
                if (wal_stream_apply_raft_row(stream, row) != 0)
134!
1036
                        goto end_error;
×
1037
        } else if (box_is_force_recovery) {
179,115✔
1038
                if (wal_stream_apply_mixed_dml_row(stream, row) != 0)
762✔
1039
                        goto end_error;
3✔
1040
        } else if (wal_stream_apply_dml_row(stream, row) != 0) {
178,353✔
1041
                goto end_error;
4✔
1042
        }
1043
        wal_stream_try_yield(stream);
179,668✔
1044
        return;
179,668✔
1045

1046
end_error:
7✔
1047
        wal_stream_abort(stream);
7✔
1048
        wal_stream_try_yield(stream);
7✔
1049
        diag_raise();
7✔
1050
}
1051

1052
/**
1053
 * Plan a yield in recovery stream. Wal stream will execute it as soon as it's
1054
 * ready.
1055
 */
1056
static void
1057
wal_stream_schedule_yield(struct xstream *base)
630✔
1058
{
1059
        struct wal_stream *stream = container_of(base, struct wal_stream, base);
630✔
1060
        stream->has_yield = true;
630✔
1061
        wal_stream_try_yield(stream);
630✔
1062
}
630✔
1063

1064
static void
1065
wal_stream_create(struct wal_stream *ctx)
846✔
1066
{
1067
        xstream_create(&ctx->base, wal_stream_apply_row,
846✔
1068
                       wal_stream_schedule_yield);
1069
        lsregion_create(&ctx->lsr, &runtime);
846✔
1070
        for (int i = 0; i < VCLOCK_MAX; i++) {
27,918✔
1071
                rlist_create(&ctx->nodes_rows[i]);
27,072✔
1072
        }
1073
        ctx->tsn = 0;
846✔
1074
        ctx->first_row_lsn = 0;
846✔
1075
        ctx->has_yield = false;
846✔
1076
        ctx->has_global_row = false;
846✔
1077
}
846✔
1078

1079
static void
1080
wal_stream_destroy(struct wal_stream *ctx)
843✔
1081
{
1082
        lsregion_destroy(&ctx->lsr);
843✔
1083
        TRASH(ctx);
843✔
1084
}
843✔
1085

1086
/* {{{ configuration bindings */
1087

1088
/*
1089
 * Check log configuration validity.
1090
 *
1091
 * Used thru Lua FFI.
1092
 */
1093
extern "C" int
1094
say_check_cfg(const char *log,
10,550✔
1095
              MAYBE_UNUSED int level,
1096
              int nonblock,
1097
              const char *format_str)
1098
{
1099
        enum say_logger_type type = SAY_LOGGER_STDERR;
10,550✔
1100
        if (log != NULL && say_parse_logger_type(&log, &type) < 0) {
10,550!
1101
                diag_set(ClientError, ER_CFG, "log",
4!
1102
                         diag_last_error(diag_get())->errmsg);
1103
                return -1;
4✔
1104
        }
1105
        if (type == SAY_LOGGER_SYSLOG) {
10,546✔
1106
                struct say_syslog_opts opts;
1107
                if (say_parse_syslog_opts(log, &opts) < 0) {
17!
1108
                        if (diag_last_error(diag_get())->type ==
2!
1109
                            &type_IllegalParams)
1110
                                diag_set(ClientError, ER_CFG, "log",
2!
1111
                                         diag_last_error(diag_get())->errmsg);
1112
                        return -1;
2✔
1113
                }
1114
                say_free_syslog_opts(&opts);
15!
1115
        }
1116

1117
        enum say_format format = say_format_by_name(format_str);
10,544!
1118
        if (format == say_format_MAX) {
10,544✔
1119
                diag_set(ClientError, ER_CFG, "log_format",
4!
1120
                         "expected 'plain' or 'json'");
1121
                return -1;
4✔
1122
        }
1123
        if (nonblock == 1 &&
10,540✔
1124
            (type == SAY_LOGGER_FILE || type == SAY_LOGGER_STDERR)) {
7✔
1125
                diag_set(ClientError, ER_CFG, "log_nonblock",
5!
1126
                         "the option is incompatible with file/stderr logger");
1127
                return -1;
5✔
1128
        }
1129
        return 0;
10,535✔
1130
}
1131

1132
/**
1133
 * Returns the authentication method corresponding to box.cfg.auth_type.
1134
 * If not found, sets diag and returns NULL.
1135
 */
1136
static const struct auth_method *
1137
box_check_auth_type(void)
9,341✔
1138
{
1139
        const char *auth_type = cfg_gets("auth_type");
9,341✔
1140
        const struct auth_method *method =
1141
                auth_method_by_name(auth_type, strlen(auth_type));
9,341✔
1142
        if (method == NULL) {
9,341✔
1143
                diag_set(ClientError, ER_CFG, "auth_type", auth_type);
1!
1144
                return NULL;
1✔
1145
        }
1146
        return method;
9,340✔
1147
}
1148

1149
static enum election_mode
1150
election_mode_by_name(const char *name)
64,214✔
1151
{
1152
        if (strcmp(name, "off") == 0)
64,214✔
1153
                return ELECTION_MODE_OFF;
60,632✔
1154
        else if (strcmp(name, "voter") == 0)
3,582✔
1155
                return ELECTION_MODE_VOTER;
565✔
1156
        else if (strcmp(name, "manual") == 0)
3,017✔
1157
                return ELECTION_MODE_MANUAL;
660✔
1158
        else if (strcmp(name, "candidate") == 0)
2,357✔
1159
                return ELECTION_MODE_CANDIDATE;
2,355✔
1160

1161
        diag_set(ClientError, ER_CFG, "election_mode",
2!
1162
                "the value must be one of the following strings: "
1163
                "'off', 'voter', 'candidate', 'manual'");
1164
        return ELECTION_MODE_INVALID;
2✔
1165
}
1166

1167
int
1168
box_check_election_mode(enum election_mode *mode)
9,430✔
1169
{
1170
        const char *mode_name = cfg_gets("election_mode");
9,430✔
1171
        *mode = election_mode_by_name(mode_name);
9,430✔
1172
        if (*mode == ELECTION_MODE_INVALID)
9,430✔
1173
                return -1;
2✔
1174
        bool anon = cfg_geti("replication_anon") != 0;
9,428✔
1175
        if (anon && *mode != ELECTION_MODE_OFF) {
9,428✔
1176
                diag_set(ClientError, ER_CFG, "election_mode",
6!
1177
                         "the value may only be set to 'off' when "
1178
                         "'replication_anon' is set to true");
1179
                return -1;
6✔
1180
        }
1181
        return 0;
9,422✔
1182
}
1183

1184
static double
1185
box_check_election_timeout(void)
9,284✔
1186
{
1187
        double d = cfg_getd("election_timeout");
9,284✔
1188
        if (d <= 0) {
9,284✔
1189
                diag_set(ClientError, ER_CFG, "election_timeout",
4!
1190
                         "the value must be a positive number");
1191
                return -1;
4✔
1192
        }
1193
        return d;
9,280✔
1194
}
1195

1196
/**
1197
 * Raises error if election_fencing_mode configuration is incorrect.
1198
 */
1199
static election_fencing_mode
1200
box_check_election_fencing_mode(void)
9,245✔
1201
{
1202
        const char *mode = cfg_gets("election_fencing_mode");
9,245✔
1203
        if (strcmp(mode, "off") == 0)
9,245✔
1204
                return ELECTION_FENCING_MODE_OFF;
197✔
1205
        else if (strcmp(mode, "soft") == 0)
9,048✔
1206
                return ELECTION_FENCING_MODE_SOFT;
9,046✔
1207
        else if (strcmp(mode, "strict") == 0)
2!
1208
                return ELECTION_FENCING_MODE_STRICT;
2✔
1209

1210
        diag_set(ClientError, ER_CFG, "election_fencing_mode",
×
1211
                 "the value must be one of the following strings: "
1212
                 "'off', 'soft', 'strict'");
1213
        return ELECTION_FENCING_MODE_INVALID;
×
1214
}
1215

1216
/** A helper to check validity of a single uri. */
1217
static int
1218
check_uri(const struct uri *uri, const char *option_name, bool set_diag)
15,804✔
1219
{
1220
        const char *auth_type = uri_param(uri, "auth_type", 0);
15,804✔
1221
        const char *errmsg;
1222
        if (uri->service == NULL) {
15,804✔
1223
                errmsg = "expected host:service or /unix.socket";
16✔
1224
                goto bad_uri;
16✔
1225
        }
1226
        if (auth_type != NULL &&
15,790✔
1227
            auth_method_by_name(auth_type, strlen(auth_type)) == NULL) {
2✔
1228
                errmsg = "unknown authentication method";
1✔
1229
                goto bad_uri;
1✔
1230
        }
1231
        return 0;
15,787✔
1232
bad_uri:;
17✔
1233
        if (set_diag) {
17✔
1234
                char *uristr = tt_static_buf();
2✔
1235
                uri_format(uristr, TT_STATIC_BUF_LEN, uri, false);
2✔
1236
                diag_set(ClientError, ER_CFG, option_name,
2!
1237
                         tt_sprintf("bad URI '%s': %s", uristr, errmsg));
1238
        }
1239
        return -1;
17✔
1240
}
1241

1242
/**
1243
 * Check validity of a uri passed in a configuration option.
1244
 * On success stores the uri in @a uri.
1245
 */
1246
static int
1247
box_check_uri(struct uri *uri, const char *option_name, bool set_diag)
39✔
1248
{
1249
        const char *source = cfg_gets(option_name);
39✔
1250
        if (source == NULL) {
39!
1251
                uri_create(uri, NULL);
×
1252
                return 0;
×
1253
        }
1254
        if (uri_create(uri, source) == 0)
39!
1255
                return check_uri(uri, option_name, set_diag);
39✔
1256
        if (set_diag) {
×
1257
                diag_set(ClientError, ER_CFG, option_name,
×
1258
                         tt_sprintf("bad URI '%s': expected host:service or "
1259
                                    "/unix.socket", source));
1260
        }
1261
        return -1;
×
1262
}
1263

1264
/**
1265
 * Check validity of a uri set passed in a configuration option.
1266
 * On success stores the uri set in @a uri_set.
1267
 */
1268
static int
1269
box_check_uri_set(struct uri_set *uri_set, const char *option_name)
18,956✔
1270
{
1271
        if (cfg_get_uri_set(option_name, uri_set) != 0) {
18,956✔
1272
                diag_set(ClientError, ER_CFG, option_name,
58!
1273
                         diag_last_error(diag_get())->errmsg);
1274
                return -1;
58✔
1275
        }
1276
        for (int i = 0; i < uri_set->uri_count; i++) {
34,661✔
1277
                const struct uri *uri = &uri_set->uris[i];
15,765✔
1278
                if (check_uri(uri, option_name, true) != 0) {
15,765✔
1279
                        uri_set_destroy(uri_set);
2✔
1280
                        return -1;
2✔
1281
                }
1282
        }
1283
        return 0;
18,896✔
1284
}
1285

1286
static int
1287
box_check_replication(struct uri_set *uri_set)
9,464✔
1288
{
1289
        return box_check_uri_set(uri_set, "replication");
9,464✔
1290
}
1291

1292
static int
1293
box_check_replication_threads(void)
4,509✔
1294
{
1295
        int count = cfg_geti("replication_threads");
4,509✔
1296
        if (count <= 0 || count > REPLICATION_THREADS_MAX) {
4,509!
1297
                diag_set(ClientError, ER_CFG, "replication_threads",
×
1298
                         tt_sprintf("must be greater than 0, less than or "
1299
                                    "equal to %d", REPLICATION_THREADS_MAX));
1300
                return -1;
×
1301
        }
1302
        return 0;
4,509✔
1303
}
1304

1305
/** Check bootstrap_strategy option validity. */
1306
static enum bootstrap_strategy
1307
box_check_bootstrap_strategy(void)
18,325✔
1308
{
1309
        const char *strategy = cfg_gets("bootstrap_strategy");
18,325✔
1310
        if (strcmp(strategy, "auto") == 0)
18,325✔
1311
                return BOOTSTRAP_STRATEGY_AUTO;
16,540✔
1312
        if (strcmp(strategy, "legacy") == 0)
1,785✔
1313
                return BOOTSTRAP_STRATEGY_LEGACY;
1,665✔
1314
        if (strcmp(strategy, "config") == 0)
120✔
1315
                return BOOTSTRAP_STRATEGY_CONFIG;
79✔
1316
        if (strcmp(strategy, "supervised") == 0)
41!
1317
                return BOOTSTRAP_STRATEGY_SUPERVISED;
41✔
1318
        diag_set(ClientError, ER_CFG, "bootstrap_strategy",
×
1319
                 "the value should be one of the following: "
1320
                 "'auto', 'config', 'supervised', 'legacy'");
1321
        return BOOTSTRAP_STRATEGY_INVALID;
×
1322
}
1323

1324
static int
1325
box_check_listen(struct uri_set *uri_set)
9,492✔
1326
{
1327
        return box_check_uri_set(uri_set, "listen");
9,492✔
1328
}
1329

1330
static double
1331
box_check_replication_timeout(void)
9,426✔
1332
{
1333
        double timeout = cfg_getd("replication_timeout");
9,426✔
1334
        if (timeout <= 0) {
9,426✔
1335
                tnt_raise(ClientError, ER_CFG, "replication_timeout",
2!
1336
                          "the value must be greather than 0");
1337
        }
1338
        return timeout;
9,424✔
1339
}
1340

1341
static double
1342
box_check_replication_connect_timeout(void)
9,346✔
1343
{
1344
        double timeout = cfg_getd("replication_connect_timeout");
9,346✔
1345
        if (timeout <= 0) {
9,346✔
1346
                tnt_raise(ClientError, ER_CFG, "replication_connect_timeout",
2!
1347
                          "the value must be greather than 0");
1348
        }
1349
        return timeout;
9,344✔
1350
}
1351

1352
static int
1353
box_check_replication_connect_quorum(void)
4,938✔
1354
{
1355
        int quorum = cfg_geti_default("replication_connect_quorum",
4,938✔
1356
                                      REPLICATION_CONNECT_QUORUM_ALL);
1357
        if (quorum < 0) {
4,938✔
1358
                tnt_raise(ClientError, ER_CFG, "replication_connect_quorum",
1!
1359
                          "the value must be greater or equal to 0");
1360
        }
1361
        return quorum;
4,937✔
1362
}
1363

1364
static double
1365
box_check_replication_sync_lag(void)
9,337✔
1366
{
1367
        double lag = cfg_getd_default("replication_sync_lag", TIMEOUT_INFINITY);
9,337✔
1368
        if (lag <= 0) {
9,337✔
1369
                tnt_raise(ClientError, ER_CFG, "replication_sync_lag",
2!
1370
                          "the value must be greater than 0");
1371
        }
1372
        return lag;
9,335✔
1373
}
1374

1375
/**
1376
 * Evaluate replication syncro quorum number from a formula.
1377
 */
1378
static int
1379
box_eval_replication_synchro_quorum(int nr_replicas)
290,202✔
1380
{
1381
        assert(nr_replicas > 0 && nr_replicas < VCLOCK_MAX);
290,202!
1382

1383
        const char loadable[] =
290,202✔
1384
                "local expr, N = ...\n"
1385
                "local f, err = loadstring('return ('..expr..')')\n"
1386
                "if not f then "
1387
                        "error(string.format('Failed to load \%\%s:"
1388
                        "\%\%s', expr, err)) "
1389
                "end\n"
1390
                "setfenv(f, {N = N, math = {"
1391
                        "ceil = math.ceil,"
1392
                        "floor = math.floor,"
1393
                        "abs = math.abs,"
1394
                        "random = math.random,"
1395
                        "min = math.min,"
1396
                        "max = math.max,"
1397
                        "sqrt = math.sqrt,"
1398
                        "fmod = math.fmod,"
1399
                "}})\n"
1400
                "local res = f()\n"
1401
                "if type(res) ~= 'number' then\n"
1402
                        "error('Expression should return a number')\n"
1403
                "end\n"
1404
                "return math.floor(res)\n";
1405
        const char *expr = cfg_gets("replication_synchro_quorum");
290,202!
1406

1407
        luaL_loadstring(tarantool_L, loadable);
290,202!
1408
        lua_pushstring(tarantool_L, expr);
290,202!
1409
        lua_pushinteger(tarantool_L, nr_replicas);
290,202!
1410

1411
        if (lua_pcall(tarantool_L, 2, 1, 0) != 0) {
290,202!
1412
                diag_set(ClientError, ER_CFG,
3!
1413
                         "replication_synchro_quorum",
1414
                         lua_tostring(tarantool_L, -1));
1415
                return -1;
3✔
1416
        }
1417

1418
        int64_t quorum = -1;
290,199✔
1419
        if (lua_isnumber(tarantool_L, -1))
290,199!
1420
                quorum = luaL_toint64(tarantool_L, -1);
290,199!
1421
        lua_pop(tarantool_L, 1);
290,199!
1422

1423
        /*
1424
         * At least we should have 1 node to sync, the weird
1425
         * formulas such as N-2 do not guarantee quorums thus
1426
         * return an error.
1427
         */
1428
        if (quorum <= 0 || quorum >= VCLOCK_MAX) {
290,199✔
1429
                const char *msg =
1430
                        tt_sprintf("the formula is evaluated "
8✔
1431
                                   "to the quorum %lld for replica "
1432
                                   "number %d, which is out of range "
1433
                                   "[%d;%d]", (long long)quorum,
1434
                                   nr_replicas, 1, VCLOCK_MAX - 1);
1435
                diag_set(ClientError, ER_CFG,
8!
1436
                         "replication_synchro_quorum", msg);
1437
                return -1;
8✔
1438
        }
1439

1440
        return quorum;
290,191✔
1441
}
1442

1443
static int
1444
box_check_replication_synchro_quorum(void)
9,709✔
1445
{
1446
        if (!cfg_isnumber("replication_synchro_quorum")) {
9,709✔
1447
                /*
1448
                 * The formula uses symbolic name 'N' as
1449
                 * a number of currently registered replicas.
1450
                 *
1451
                 * When we're in "checking" mode we should walk
1452
                 * over all possible number of replicas to make
1453
                 * sure the formula is correct.
1454
                 *
1455
                 * Note that currently VCLOCK_MAX is pretty small
1456
                 * value but if we gonna increase this limit make
1457
                 * sure that the cycle won't take too much time.
1458
                 */
1459
                for (int i = 1; i < VCLOCK_MAX; i++) {
284,871✔
1460
                        if (box_eval_replication_synchro_quorum(i) < 0)
275,971✔
1461
                                return -1;
11✔
1462
                }
1463
                return 0;
8,900✔
1464
        }
1465

1466
        int64_t quorum = cfg_geti64("replication_synchro_quorum");
798✔
1467
        if (quorum <= 0 || quorum >= VCLOCK_MAX) {
798✔
1468
                diag_set(ClientError, ER_CFG, "replication_synchro_quorum",
6!
1469
                         "the value must be greater than zero and less than "
1470
                         "maximal number of replicas");
1471
                return -1;
6✔
1472
        }
1473
        return 0;
792✔
1474
}
1475

1476
static double
1477
box_check_replication_synchro_timeout(void)
9,629✔
1478
{
1479
        double timeout = cfg_getd("replication_synchro_timeout");
9,629✔
1480
        if (timeout <= 0) {
9,629✔
1481
                diag_set(ClientError, ER_CFG, "replication_synchro_timeout",
4!
1482
                         "the value must be greater than zero");
1483
                return -1;
4✔
1484
        }
1485
        return timeout;
9,625✔
1486
}
1487

1488
static double
1489
box_check_replication_sync_timeout(void)
9,340✔
1490
{
1491
        double timeout = cfg_getd("replication_sync_timeout");
9,340✔
1492
        if (timeout < 0) {
9,340✔
1493
                tnt_raise(ClientError, ER_CFG, "replication_sync_timeout",
1!
1494
                          "the value must be greater than or equal to 0");
1495
        }
1496
        return timeout;
9,339✔
1497
}
1498

1499
/** Check validity of a uuid passed in a configuration option. */
1500
static inline int
1501
box_check_uuid(struct tt_uuid *uuid, const char *name, bool set_diag)
17,942✔
1502
{
1503
        *uuid = uuid_nil;
17,942✔
1504
        const char *uuid_str = cfg_gets(name);
17,942✔
1505
        if (uuid_str == NULL)
17,942✔
1506
                return 0;
17,614✔
1507
        if (tt_uuid_from_string(uuid_str, uuid) != 0) {
328✔
1508
                if (set_diag)
11✔
1509
                        diag_set(ClientError, ER_CFG, name, uuid_str);
2!
1510
                return -1;
11✔
1511
        }
1512
        if (tt_uuid_is_nil(uuid)) {
317✔
1513
                if (set_diag) {
2!
1514
                        diag_set(ClientError, ER_CFG, name,
2!
1515
                                 tt_sprintf("nil UUID is reserved"));
1516
                }
1517
                return -1;
2✔
1518
        }
1519
        return 0;
315✔
1520
}
1521

1522
static bool
1523
box_check_ro(void)
4,811✔
1524
{
1525
        bool ro = cfg_geti("read_only") != 0;
4,811✔
1526
        bool anon = cfg_geti("replication_anon") != 0;
4,811✔
1527
        if (anon && !ro) {
4,811✔
1528
                tnt_raise(ClientError, ER_CFG, "read_only",
1!
1529
                          "the value may be set to false only when "
1530
                          "replication_anon is false");
1531
        }
1532
        return ro;
4,810✔
1533
}
1534

1535
static bool
1536
box_check_replication_anon(void)
4,829✔
1537
{
1538
        bool anon = cfg_geti("replication_anon") != 0;
4,829✔
1539
        bool ro = cfg_geti("read_only") != 0;
4,829✔
1540
        const char *mode_name = cfg_gets("election_mode");
4,829✔
1541
        enum election_mode mode = election_mode_by_name(mode_name);
4,829✔
1542
        if (mode == ELECTION_MODE_INVALID)
4,829!
1543
                diag_raise();
×
1544
        if (anon && !ro) {
4,829!
1545
                tnt_raise(ClientError, ER_CFG, "replication_anon",
×
1546
                          "the value may be set to true only when "
1547
                          "the instance is read-only");
1548
        }
1549
        if (anon && mode != ELECTION_MODE_OFF) {
4,829✔
1550
                tnt_raise(ClientError, ER_CFG, "replication_anon",
3!
1551
                          "the value may be set to true only when "
1552
                          "'election_mode' is set to 'off'");
1553
        }
1554
        return anon;
4,826✔
1555
}
1556

1557
static double
1558
box_check_replication_anon_ttl(void)
9,227✔
1559
{
1560
        double ttl = cfg_getd("replication_anon_ttl");
9,227✔
1561
        if (ttl <= 0) {
9,227!
1562
                diag_set(ClientError, ER_CFG, "replication_anon_ttl",
×
1563
                         "the value must be greater than 0");
1564
                return -1;
×
1565
        }
1566
        return ttl;
9,227✔
1567
}
1568

1569
static int
1570
box_check_instance_uuid(struct tt_uuid *uuid)
8,997✔
1571
{
1572
        return box_check_uuid(uuid, "instance_uuid", true);
8,997✔
1573
}
1574

1575
/** Fetch an optional node name from the config. */
1576
static int
1577
box_check_node_name(char *out, const char *cfg_name, bool set_diag)
15,676✔
1578
{
1579
        if (schema_check_feature(SCHEMA_FEATURE_PERSISTENT_NAMES) != 0)
15,676✔
1580
                return -1;
1✔
1581
        const char *name = cfg_gets(cfg_name);
15,675✔
1582
        if (name == NULL) {
15,675✔
1583
                *out = 0;
13,538✔
1584
                return 0;
13,538✔
1585
        }
1586
        /* Nil name is allowed as Lua box.NULL or nil. Not as "". */
1587
        if (!node_name_is_valid(name)) {
2,137✔
1588
                if (set_diag) {
18!
1589
                        diag_set(ClientError, ER_CFG, cfg_name,
18!
1590
                                 "expected a valid name");
1591
                }
1592
                return -1;
18✔
1593
        }
1594
        strlcpy(out, name, NODE_NAME_SIZE_MAX);
2,119✔
1595
        return 0;
2,119✔
1596
}
1597

1598
static int
1599
box_check_instance_name(char *out)
6,386✔
1600
{
1601
        return box_check_node_name(out, "instance_name", true);
6,386✔
1602
}
1603

1604
static int
1605
box_check_replicaset_uuid(struct tt_uuid *uuid)
8,930✔
1606
{
1607
        return box_check_uuid(uuid, "replicaset_uuid", true);
8,930✔
1608
}
1609

1610
/** Check bootstrap_leader option validity. */
1611
static int
1612
box_check_bootstrap_leader(struct uri *uri, struct tt_uuid *uuid, char *name)
8,985✔
1613
{
1614
        *uuid = uuid_nil;
8,985✔
1615
        if (!uri_is_nil(uri))
8,985✔
1616
                uri_destroy(uri);
2✔
1617
        uri_create(uri, NULL);
8,985✔
1618
        *name = '\0';
8,985✔
1619
        const char *source = cfg_gets("bootstrap_leader");
8,985✔
1620
        enum bootstrap_strategy strategy = box_check_bootstrap_strategy();
8,985✔
1621
        if (strategy != BOOTSTRAP_STRATEGY_CONFIG) {
8,985✔
1622
                if (source == NULL) {
8,945✔
1623
                        /* Nothing to do. */
1624
                        return 0;
8,944✔
1625
                }
1626
                diag_set(ClientError, ER_CFG, "bootstrap_leader",
1!
1627
                         "the option takes no effect when bootstrap strategy "
1628
                         "is not 'config'");
1629
                return -1;
1✔
1630
        } else if (source == NULL) {
40✔
1631
                diag_set(ClientError, ER_CFG, "bootstrap_leader",
1!
1632
                         "the option can't be empty when bootstrap strategy "
1633
                         "is 'config'");
1634
                return -1;
1✔
1635
        }
1636
        if (box_check_uri(uri, "bootstrap_leader", false) == 0)
39✔
1637
                return 0;
24✔
1638
        /* Not a uri. Try uuid then. */
1639
        if (box_check_uuid(uuid, "bootstrap_leader", false) == 0)
15✔
1640
                return 0;
6✔
1641
        if (box_check_node_name(name, "bootstrap_leader", false) == 0)
9!
1642
                return 0;
9✔
1643
        diag_set(ClientError, ER_CFG, "bootstrap_leader",
×
1644
                 "the value must be either a uri, a uuid or a name");
1645
        return -1;
×
1646
}
1647

1648
static int
1649
box_check_replicaset_name(char *out)
4,816✔
1650
{
1651
        return box_check_node_name(out, "replicaset_name", true);
4,816✔
1652
}
1653

1654
static int
1655
box_check_cluster_name(char *out)
4,465✔
1656
{
1657
        return box_check_node_name(out, "cluster_name", true);
4,465✔
1658
}
1659

1660
static enum wal_mode
1661
box_check_wal_mode(const char *mode_name)
8,981✔
1662
{
1663
        assert(mode_name != NULL); /* checked in Lua */
8,981!
1664
        int mode = strindex(wal_mode_STRS, mode_name, WAL_MODE_MAX);
8,981✔
1665
        if (mode == WAL_MODE_MAX)
8,981✔
1666
                tnt_raise(ClientError, ER_CFG, "wal_mode", mode_name);
1!
1667
        return (enum wal_mode) mode;
8,980✔
1668
}
1669

1670
static int64_t
1671
box_check_wal_queue_max_size(void)
9,364✔
1672
{
1673
        int64_t size = cfg_geti64("wal_queue_max_size");
9,364✔
1674
        if (size < 0) {
9,364✔
1675
                diag_set(ClientError, ER_CFG, "wal_queue_max_size",
1!
1676
                         "wal_queue_max_size must be >= 0");
1677
        }
1678
        /* Unlimited. */
1679
        if (size == 0)
9,364!
1680
                size = INT64_MAX;
×
1681
        return size;
9,364✔
1682
}
1683

1684
/** Check replication_synchro_queue_max_size option validity. */
1685
static int64_t
1686
box_check_replication_synchro_queue_max_size(void)
12,940✔
1687
{
1688
        int64_t size = cfg_geti64("replication_synchro_queue_max_size");
12,940✔
1689
        if (size < 0) {
12,940✔
1690
                diag_set(ClientError, ER_CFG,
1!
1691
                         "replication_synchro_queue_max_size",
1692
                         "replication_synchro_queue_max_size must be >= 0");
1693
        }
1694
        /* Unlimited. */
1695
        if (size == 0)
12,940✔
1696
                size = INT64_MAX;
2✔
1697
        return size;
12,940✔
1698
}
1699

1700
static double
1701
box_check_wal_cleanup_delay(void)
4,587✔
1702
{
1703
        double value = cfg_getd("wal_cleanup_delay");
4,587✔
1704
        if (value < 0) {
4,587!
1705
                diag_set(ClientError, ER_CFG, "wal_cleanup_delay",
×
1706
                         "value must be >= 0");
1707
                return -1;
×
1708
        }
1709

1710
        return value;
4,587✔
1711
}
1712

1713
static void
1714
box_check_readahead(int readahead)
9,331✔
1715
{
1716
        enum { READAHEAD_MIN = 128, READAHEAD_MAX = 2147483647 };
1717
        if (readahead < (int) READAHEAD_MIN ||
9,331!
1718
            readahead > (int) READAHEAD_MAX) {
1719
                tnt_raise(ClientError, ER_CFG, "readahead",
×
1720
                          "specified value is out of bounds");
1721
        }
1722
}
9,331✔
1723

1724
static void
1725
box_check_checkpoint_count(int checkpoint_count)
9,273✔
1726
{
1727
        if (checkpoint_count < 1) {
9,273✔
1728
                tnt_raise(ClientError, ER_CFG, "checkpoint_count",
1!
1729
                          "the value must not be less than one");
1730
        }
1731
}
9,272✔
1732

1733
static int64_t
1734
box_check_wal_max_size(int64_t wal_max_size)
8,981✔
1735
{
1736
        /* check wal_max_bytes configuration */
1737
        if (wal_max_size <= 1) {
8,981!
1738
                tnt_raise(ClientError, ER_CFG, "wal_max_size",
×
1739
                          "the value must be greater than one");
1740
        }
1741
        return wal_max_size;
8,981✔
1742
}
1743

1744
/** Validate that wal_retention_period is >= 0. */
1745
static double
1746
box_check_wal_retention_period()
8,978✔
1747
{
1748
        double value = cfg_getd("wal_retention_period");
8,978✔
1749
        if (value < 0) {
8,978!
1750
                diag_set(ClientError, ER_CFG, "wal_retention_period",
×
1751
                         "the value must be >= 0");
1752
                return -1;
×
1753
        }
1754
        return value;
8,978✔
1755
}
1756

1757
/** Validate wal_retention_period and raise error, if needed. */
1758
static double
1759
box_check_wal_retention_period_xc()
4,474✔
1760
{
1761
        double value = box_check_wal_retention_period();
4,474✔
1762
        if (value < 0)
4,474!
1763
                diag_raise();
×
1764
        return value;
4,474✔
1765
}
1766

1767
static ssize_t
1768
box_check_memory_quota(const char *quota_name)
9,708✔
1769
{
1770
        int64_t size = cfg_geti64(quota_name);
9,708✔
1771
        if (size >= 0 && (size_t) size <= QUOTA_MAX)
9,708✔
1772
                return size;
9,702✔
1773
        diag_set(ClientError, ER_CFG, quota_name,
6!
1774
                 tt_sprintf("must be >= 0 and <= %zu, but it is %lld",
1775
                            QUOTA_MAX, (long long)size));
1776
        return -1;
6✔
1777
}
1778

1779
static void
1780
box_check_vinyl_options(void)
4,496✔
1781
{
1782
        int read_threads = cfg_geti("vinyl_read_threads");
4,496✔
1783
        int write_threads = cfg_geti("vinyl_write_threads");
4,496✔
1784
        int64_t range_size = cfg_geti64("vinyl_range_size");
4,496✔
1785
        int64_t page_size = cfg_geti64("vinyl_page_size");
4,496✔
1786
        int run_count_per_level = cfg_geti("vinyl_run_count_per_level");
4,496✔
1787
        double run_size_ratio = cfg_getd("vinyl_run_size_ratio");
4,496✔
1788
        double bloom_fpr = cfg_getd("vinyl_bloom_fpr");
4,496✔
1789

1790
        if (box_check_memory_quota("vinyl_memory") < 0)
4,496✔
1791
                diag_raise();
1✔
1792

1793
        if (read_threads < 1) {
4,495✔
1794
                tnt_raise(ClientError, ER_CFG, "vinyl_read_threads",
1!
1795
                          "must be greater than or equal to 1");
1796
        }
1797
        if (write_threads < 2) {
4,494✔
1798
                tnt_raise(ClientError, ER_CFG, "vinyl_write_threads",
1!
1799
                          "must be greater than or equal to 2");
1800
        }
1801
        if (page_size <= 0 || (range_size > 0 && page_size > range_size)) {
4,493✔
1802
                tnt_raise(ClientError, ER_CFG, "vinyl_page_size",
2!
1803
                          "must be greater than 0 and less than "
1804
                          "or equal to vinyl_range_size");
1805
        }
1806
        if (run_count_per_level <= 0) {
4,491✔
1807
                tnt_raise(ClientError, ER_CFG, "vinyl_run_count_per_level",
1!
1808
                          "must be greater than 0");
1809
        }
1810
        if (run_size_ratio <= 1) {
4,490✔
1811
                tnt_raise(ClientError, ER_CFG, "vinyl_run_size_ratio",
1!
1812
                          "must be greater than 1");
1813
        }
1814
        if (bloom_fpr <= 0 || bloom_fpr > 1) {
4,489✔
1815
                tnt_raise(ClientError, ER_CFG, "vinyl_bloom_fpr",
2!
1816
                          "must be greater than 0 and less than or equal to 1");
1817
        }
1818
}
4,487✔
1819

1820
static int
1821
box_check_sql_cache_size(int size)
13,692✔
1822
{
1823
        if (size < 0) {
13,692!
1824
                diag_set(ClientError, ER_CFG, "sql_cache_size",
×
1825
                         "must be non-negative");
1826
                return -1;
×
1827
        }
1828
        return 0;
13,692✔
1829
}
1830

1831
static int
1832
box_check_allocator(void)
4,498✔
1833
{
1834
        const char *allocator = cfg_gets("memtx_allocator");
4,498✔
1835
        if (strcmp(allocator, "small") && strcmp(allocator, "system")) {
4,498!
1836
                diag_set(ClientError, ER_CFG, "memtx_allocator",
×
1837
                         tt_sprintf("must be small or system, "
1838
                                    "but was set to %s", allocator));
1839
                return -1;
×
1840
        }
1841
        return 0;
4,498✔
1842
}
1843

1844
static void
1845
box_check_small_alloc_options(void)
4,498✔
1846
{
1847
        /*
1848
         * If we use the int type, we may get an incorrect
1849
         * result if the user enters a large value.
1850
         */
1851
        int64_t granularity = cfg_geti64("slab_alloc_granularity");
4,498✔
1852
        /*
1853
         * Granularity must be exponent of two and >= 4.
1854
         * We can use granularity value == 4 only because we used small
1855
         * memory allocator only for struct tuple, which doesn't require
1856
         * aligment. Also added an upper bound for granularity, since if
1857
         * the user enters too large value, he will get incomprehensible
1858
         * errors later.
1859
         */
1860
        if (granularity < 4 || granularity > 1024 * 16 ||
8,995!
1861
            ! is_exp_of_two(granularity))
4,497✔
1862
                tnt_raise(ClientError, ER_CFG, "slab_alloc_granularity",
2!
1863
                          "must be greater than or equal to 4,"
1864
                          " less than or equal"
1865
                          " to 1024 * 16 and exponent of two");
1866
}
4,496✔
1867

1868
static int
1869
box_check_iproto_options(void)
4,487✔
1870
{
1871
        int iproto_threads = cfg_geti("iproto_threads");
4,487✔
1872
        if (iproto_threads <= 0 || iproto_threads > IPROTO_THREADS_MAX) {
4,487✔
1873
                diag_set(ClientError, ER_CFG, "iproto_threads",
3!
1874
                          tt_sprintf("must be greater than or equal to 0,"
1875
                                     " less than or equal to %d",
1876
                                     IPROTO_THREADS_MAX));
1877
                return -1;
3✔
1878
        }
1879
        return 0;
4,484✔
1880
}
1881

1882
static double
1883
box_check_txn_timeout(void)
9,201✔
1884
{
1885
        double timeout = cfg_getd_default("txn_timeout", TIMEOUT_INFINITY);
9,201✔
1886
        if (timeout <= 0) {
9,201✔
1887
                diag_set(ClientError, ER_CFG, "txn_timeout",
2!
1888
                         "the value must be greather than 0");
1889
                return -1;
2✔
1890
        }
1891
        return timeout;
9,199✔
1892
}
1893

1894
/**
1895
 * Get and check isolation level from config, converting number or string to
1896
 * enum txn_isolation_level.
1897
 * @return isolation level or txn_isolation_level_MAX is case of error.
1898
 */
1899
static enum txn_isolation_level
1900
box_check_txn_isolation(void)
9,269✔
1901
{
1902
        uint32_t level;
1903
        if (cfg_isnumber("txn_isolation")) {
9,269✔
1904
                level = cfg_geti("txn_isolation");
26✔
1905
        } else {
1906
                const char *str_level = cfg_gets("txn_isolation");
9,243✔
1907
                level = strindex(txn_isolation_level_strs, str_level,
9,243✔
1908
                                 txn_isolation_level_MAX);
1909
                if (level == txn_isolation_level_MAX)
9,243✔
1910
                        level = strindex(txn_isolation_level_aliases, str_level,
9,240✔
1911
                                         txn_isolation_level_MAX);
1912
        }
1913
        if (level >= txn_isolation_level_MAX) {
9,269✔
1914
                diag_set(ClientError, ER_CFG, "txn_isolation",
4!
1915
                         "must be one of "
1916
                         "box.txn_isolation_level (keys or values)");
1917
                return txn_isolation_level_MAX;
4✔
1918
        }
1919
        if (level == TXN_ISOLATION_DEFAULT) {
9,265✔
1920
                diag_set(ClientError, ER_CFG, "txn_isolation",
2!
1921
                         "cannot set default transaction isolation "
1922
                         "to 'default'");
1923
                return txn_isolation_level_MAX;
2✔
1924
        }
1925
        if (level == TXN_ISOLATION_LINEARIZABLE) {
9,263✔
1926
                diag_set(ClientError, ER_CFG, "txn_isolation",
1!
1927
                         "cannot set default transaction isolation "
1928
                         "to 'linearizable'");
1929
                return txn_isolation_level_MAX;
1✔
1930
        }
1931
        return (enum txn_isolation_level)level;
9,262✔
1932
}
1933

1934
static void
1935
box_check_say()
4,534✔
1936
{
1937
        if (luaT_dostring(tarantool_L,
4,534✔
1938
                          "require('log').box_api.cfg_check()") != 0)
4,534✔
1939
                diag_raise();
9✔
1940
}
4,525✔
1941

1942
int
1943
box_init_say()
4,479✔
1944
{
1945
        if (luaT_dostring(tarantool_L, "require('log').box_api.cfg()") != 0)
4,479!
1946
                return -1;
×
1947

1948
        if (cfg_geti("background") && say_set_background() != 0)
4,478!
1949
                return -1;
×
1950

1951
        return 0;
4,478✔
1952
}
1953

1954
/**
1955
 * Checks whether memtx_sort_threads configuration parameter is correct.
1956
 */
1957
static void
1958
box_check_memtx_sort_threads(void)
4,483✔
1959
{
1960
        int num = cfg_geti("memtx_sort_threads");
4,483✔
1961
        /*
1962
         * After high level checks this parameter is either nil or has
1963
         * type 'number'.
1964
         */
1965
        if (cfg_isnumber("memtx_sort_threads") &&
4,486✔
1966
            (num <= 0 || num > TT_SORT_THREADS_MAX))
3✔
1967
                tnt_raise(ClientError, ER_CFG, "memtx_sort_threads",
3!
1968
                          tt_sprintf("must be greater than 0 and less than or"
1969
                                     " equal to %d", TT_SORT_THREADS_MAX));
1970
}
4,480✔
1971

1972
void
1973
box_check_config(void)
4,534✔
1974
{
1975
        struct tt_uuid uuid;
1976
        struct uri uri;
1977
        struct uri_set uri_set;
1978
        char name[NODE_NAME_SIZE_MAX];
1979
        enum election_mode election_mode;
1980
        box_check_say();
4,534✔
1981
        if (audit_log_check_cfg() != 0)
4,525!
1982
                diag_raise();
×
1983
        if (box_check_flightrec() != 0)
4,525!
1984
                diag_raise();
×
1985
        if (box_check_listen(&uri_set) != 0)
4,525!
1986
                diag_raise();
1✔
1987
        uri_set_destroy(&uri_set);
4,524!
1988
        if (box_check_auth_type() == NULL)
4,524!
1989
                diag_raise();
×
1990
        if (box_check_instance_uuid(&uuid) != 0)
4,524!
1991
                diag_raise();
2✔
1992
        if (box_check_replicaset_uuid(&uuid) != 0)
4,522!
1993
                diag_raise();
2✔
1994
        if (box_check_election_mode(&election_mode) != 0)
4,520!
1995
                diag_raise();
3✔
1996
        if (box_check_election_timeout() < 0)
4,517!
1997
                diag_raise();
×
1998
        if (box_check_election_fencing_mode() == ELECTION_FENCING_MODE_INVALID)
4,517!
1999
                diag_raise();
×
2000
        if (box_check_replication(&uri_set) != 0)
4,517!
2001
                diag_raise();
1✔
2002
        uri_set_destroy(&uri_set);
4,516!
2003
        box_check_replication_timeout();
4,516✔
2004
        box_check_replication_connect_timeout();
4,514✔
2005
        box_check_replication_connect_quorum();
4,512✔
2006
        box_check_replication_sync_lag();
4,511✔
2007
        if (box_check_replication_synchro_quorum() != 0)
4,509!
2008
                diag_raise();
×
2009
        if (box_check_replication_synchro_timeout() < 0)
4,509!
2010
                diag_raise();
×
2011
        if (box_check_replication_threads() < 0)
4,509!
2012
                diag_raise();
×
2013
        box_check_replication_sync_timeout();
4,509✔
2014
        if (box_check_replication_anon_ttl() < 0)
4,508!
2015
                diag_raise();
×
2016
        if (box_check_bootstrap_strategy() == BOOTSTRAP_STRATEGY_INVALID)
4,508!
2017
                diag_raise();
×
2018
        uri_create(&uri, NULL);
4,508!
2019
        if (box_check_bootstrap_leader(&uri, &uuid, name) != 0)
4,508!
2020
                diag_raise();
1✔
2021
        uri_destroy(&uri);
4,507!
2022
        box_check_readahead(cfg_geti("readahead"));
4,507!
2023
        box_check_checkpoint_count(cfg_geti("checkpoint_count"));
4,507!
2024
        box_check_wal_max_size(cfg_geti64("wal_max_size"));
4,507!
2025
        box_check_wal_mode(cfg_gets("wal_mode"));
4,507!
2026
        if (box_check_wal_queue_max_size() < 0)
4,506!
2027
                diag_raise();
1✔
2028
        if (box_check_replication_synchro_queue_max_size() < 0)
4,505!
2029
                diag_raise();
1✔
2030
        if (box_check_wal_cleanup_delay() < 0)
4,504!
2031
                diag_raise();
×
2032
        if (box_check_wal_retention_period() < 0)
4,504!
2033
                diag_raise();
×
2034
        if (box_check_memory_quota("memtx_memory") < 0)
4,504!
2035
                diag_raise();
1✔
2036
        box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size"));
4,503!
2037
        if (box_check_allocator() != 0)
4,498!
2038
                diag_raise();
×
2039
        box_check_small_alloc_options();
4,498✔
2040
        box_check_vinyl_options();
4,496✔
2041
        if (box_check_iproto_options() != 0)
4,487!
2042
                diag_raise();
3✔
2043
        if (box_check_sql_cache_size(cfg_geti("sql_cache_size")) != 0)
4,484!
2044
                diag_raise();
×
2045
        if (box_check_txn_timeout() < 0)
4,484!
2046
                diag_raise();
×
2047
        if (box_check_txn_isolation() == txn_isolation_level_MAX)
4,484!
2048
                diag_raise();
1✔
2049
        box_check_memtx_sort_threads();
4,483✔
2050
}
4,480✔
2051

2052
int
2053
box_set_auth_type(void)
4,817✔
2054
{
2055
        const struct auth_method *method = box_check_auth_type();
4,817✔
2056
        if (method == NULL)
4,817✔
2057
                return -1;
1✔
2058
        box_auth_type = method->name;
4,816✔
2059
        return 0;
4,816✔
2060
}
2061

2062
int
2063
box_set_election_mode(void)
4,910✔
2064
{
2065
        enum election_mode mode;
2066
        if (box_check_election_mode(&mode) != 0)
4,910!
2067
                return -1;
5✔
2068
        box_raft_cfg_election_mode(mode);
4,905!
2069
        box_broadcast_ballot();
4,905!
2070
        return 0;
4,905✔
2071
}
2072

2073
int
2074
box_set_election_timeout(void)
4,767✔
2075
{
2076
        double d = box_check_election_timeout();
4,767✔
2077
        if (d < 0)
4,767✔
2078
                return -1;
4✔
2079
        raft_cfg_election_timeout(box_raft(), d);
4,763✔
2080
        return 0;
4,763✔
2081
}
2082

2083
int
2084
box_set_election_fencing_mode(void)
4,728✔
2085
{
2086
        enum election_fencing_mode mode = box_check_election_fencing_mode();
4,728✔
2087
        if (mode == ELECTION_FENCING_MODE_INVALID)
4,728!
2088
                return -1;
×
2089
        box_raft_set_election_fencing_mode(mode);
4,728✔
2090
        return 0;
4,728✔
2091
}
2092

2093
/*
2094
 * Sync box.cfg.replication with the cluster registry, but
2095
 * don't start appliers.
2096
 */
2097
static void
2098
box_sync_replication(bool do_quorum, bool do_reuse)
9,074✔
2099
{
2100
        struct uri_set uri_set;
2101
        int rc = cfg_get_uri_set("replication", &uri_set);
9,074!
2102
        assert(rc == 0);
9,074!
2103
        (void)rc;
2104
        auto uri_set_guard = make_scoped_guard([&]{
9,074✔
2105
                uri_set_destroy(&uri_set);
9,074✔
2106
        });
18,148!
2107
        replicaset_connect(&uri_set, do_quorum, do_reuse);
9,074✔
2108
}
9,055✔
2109

2110
static inline void
2111
box_restart_replication(void)
3,661✔
2112
{
2113
        const bool do_quorum = true;
3,661✔
2114
        const bool do_reuse = false;
3,661✔
2115
        box_sync_replication(do_quorum, do_reuse);
3,661✔
2116
}
3,645✔
2117

2118
static inline void
2119
box_update_replication(void)
5,413✔
2120
{
2121
        /*
2122
         * In legacy mode proceed as soon as `replication_connect_quorum` remote
2123
         * peers are connected.
2124
         * In every other mode, try to connect to everyone during the given time
2125
         * period, but do not fail even if no connections were established.
2126
         */
2127
        const bool do_quorum = bootstrap_strategy != BOOTSTRAP_STRATEGY_LEGACY;
5,413✔
2128
        const bool do_reuse = true;
5,413✔
2129
        box_sync_replication(do_quorum, do_reuse);
5,413✔
2130
}
5,410✔
2131

2132
void
2133
box_set_replication(void)
4,947✔
2134
{
2135
        if (!is_box_configured) {
4,947!
2136
                /*
2137
                 * Do nothing, we're in local hot standby mode, this instance
2138
                 * will automatically begin following the replica when local
2139
                 * hot standby mode is finished, see box_cfg().
2140
                 */
2141
                return;
374✔
2142
        }
2143
        struct uri_set uri_set;
2144
        if (box_check_replication(&uri_set) != 0)
4,947!
2145
                diag_raise();
2✔
2146
        bool unchanged = uri_set_is_equal(&uri_set, &replication_uris);
4,945!
2147
        uri_set_destroy(&uri_set);
4,945!
2148
        if (unchanged) {
4,945✔
2149
                /*
2150
                 * No need to reconnect or sync in case the configuration is
2151
                 * the same. However, we should still reload the URIs because
2152
                 * a URI parameter may store a path to a file (for example,
2153
                 * an SSL certificate), which could change.
2154
                 */
2155
                replicaset_reload_uris();
374!
2156
                return;
374✔
2157
        }
2158
        /*
2159
         * Try to connect to all replicas within the timeout period.
2160
         * Stay in orphan mode in case we fail to connect to at least
2161
         * 'replication_connect_quorum' remote instances.
2162
         */
2163
        box_update_replication();
4,571✔
2164
        /* Follow replica */
2165
        replicaset_follow();
4,568!
2166
        /* Wait until appliers are in sync */
2167
        replicaset_sync();
4,568!
2168
}
2169

2170
void
2171
box_set_replication_timeout(void)
4,910✔
2172
{
2173
        replication_timeout = box_check_replication_timeout();
4,910✔
2174
        raft_cfg_death_timeout(box_raft(), replication_disconnect_timeout());
4,910✔
2175
        box_update_broadcast_ballot_interval(replication_timeout);
4,910✔
2176
}
4,910✔
2177

2178
void
2179
box_set_replication_connect_timeout(void)
4,832✔
2180
{
2181
        replication_connect_timeout = box_check_replication_connect_timeout();
4,832✔
2182
}
4,832✔
2183

2184
void
2185
box_set_replication_connect_quorum(void)
426✔
2186
{
2187
        replication_connect_quorum = box_check_replication_connect_quorum();
426✔
2188
        if (is_box_configured)
426✔
2189
                replicaset_check_quorum();
14✔
2190
}
426✔
2191

2192
int
2193
box_set_bootstrap_strategy(void)
4,832✔
2194
{
2195
        enum bootstrap_strategy strategy = box_check_bootstrap_strategy();
4,832✔
2196
        if (strategy == BOOTSTRAP_STRATEGY_INVALID)
4,832!
2197
                return -1;
×
2198
        bootstrap_strategy = strategy;
4,832✔
2199
        return 0;
4,832✔
2200
}
2201

2202
int
2203
box_set_bootstrap_leader(void)
4,477✔
2204
{
2205
        return box_check_bootstrap_leader(&cfg_bootstrap_leader_uri,
4,477✔
2206
                                          &cfg_bootstrap_leader_uuid,
2207
                                          cfg_bootstrap_leader_name);
4,477✔
2208
}
2209

2210
/** Persist this instance as the bootstrap leader in _schema space. */
2211
static int
2212
box_set_bootstrap_leader_record(void)
4✔
2213
{
2214
        assert(instance_id != REPLICA_ID_NIL);
4!
2215
        assert(!tt_uuid_is_nil(&INSTANCE_UUID));
4!
2216
        return boxk(IPROTO_REPLACE, BOX_SCHEMA_ID,
4✔
2217
                    "[%s%s%" PRIu64 "%" PRIu32 "]", "bootstrap_leader_uuid",
2218
                    tt_uuid_str(&INSTANCE_UUID), fiber_time64(), instance_id);
4✔
2219
}
2220

2221
int
2222
box_make_bootstrap_leader(void)
4✔
2223
{
2224
        if (tt_uuid_is_nil(&INSTANCE_UUID)) {
4!
2225
                diag_set(ClientError, ER_UNSUPPORTED,
×
2226
                         "box.ctl.make_bootstrap_leader()",
2227
                         "promoting this instance before box.cfg() is called");
2228
                return -1;
×
2229
        }
2230
        /* Bootstrap strategy is read by the time instance uuid is known. */
2231
        assert(bootstrap_strategy != BOOTSTRAP_STRATEGY_INVALID);
4!
2232
        if (bootstrap_strategy != BOOTSTRAP_STRATEGY_SUPERVISED) {
4!
2233
                diag_set(ClientError, ER_UNSUPPORTED,
×
2234
                         tt_sprintf("bootstrap_strategy = '%s'",
2235
                                    cfg_gets("bootstrap_strategy")),
2236
                         "promoting the bootstrap leader via "
2237
                         "box.ctl.make_bootstrap_leader()");
2238
                return -1;
×
2239
        }
2240
        if (is_box_configured) {
4✔
2241
                if (box_check_writable() != 0)
2!
2242
                        return -1;
×
2243
                /* Ballot broadcast will happen in an on_commit trigger. */
2244
                return box_set_bootstrap_leader_record();
2✔
2245
        } else {
2246
                bootstrap_leader_uuid = INSTANCE_UUID;
2✔
2247
                box_broadcast_ballot();
2✔
2248
                return 0;
2✔
2249
        }
2250
}
2251

2252
void
2253
box_set_replication_sync_lag(void)
4,826✔
2254
{
2255
        replication_sync_lag = box_check_replication_sync_lag();
4,826✔
2256
}
4,826✔
2257

2258
void
2259
box_update_replication_synchro_quorum(void)
15,324✔
2260
{
2261
        int quorum = -1;
15,324✔
2262

2263
        if (!cfg_isnumber("replication_synchro_quorum")) {
15,324✔
2264
                /*
2265
                 * The formula has been verified already. For bootstrap
2266
                 * stage pass 1 as a number of replicas to sync because
2267
                 * we're at early stage and registering a new replica.
2268
                 *
2269
                 * This should cover the valid case where formula is plain
2270
                 * "N", ie all replicas are to be synchro mode.
2271
                 */
2272
                int value = MAX(1, replicaset.registered_count);
14,231✔
2273
                quorum = box_eval_replication_synchro_quorum(value);
14,231✔
2274
                say_info("update replication_synchro_quorum = %d", quorum);
14,231✔
2275
        } else {
2276
                quorum = cfg_geti("replication_synchro_quorum");
1,093✔
2277
        }
2278

2279
        /*
2280
         * This should never happen because the values were
2281
         * validated already but just to prevent from
2282
         * unexpected changes and because the value is too
2283
         * important for qsync, lets re-check (this is cheap).
2284
         */
2285
        if (quorum <= 0 || quorum >= VCLOCK_MAX)
15,324!
2286
                panic("failed to eval/fetch replication_synchro_quorum");
×
2287

2288
        /*
2289
         * Extending replicaset pause fencing until quorum is obtained.
2290
         */
2291
        if (quorum > replication_synchro_quorum &&
15,324✔
2292
            replicaset.healthy_count < quorum)
2,702✔
2293
                box_raft_election_fencing_pause();
2,692✔
2294

2295
        replication_synchro_quorum = quorum;
15,324✔
2296
        txn_limbo_on_parameters_change(&txn_limbo);
15,324✔
2297
        box_raft_update_election_quorum();
15,324✔
2298
        replicaset_on_health_change();
15,324✔
2299
}
15,324✔
2300

2301
int
2302
box_set_replication_synchro_quorum(void)
5,200✔
2303
{
2304
        if (box_check_replication_synchro_quorum() != 0)
5,200✔
2305
                return -1;
17✔
2306
        box_update_replication_synchro_quorum();
5,183✔
2307
        return 0;
5,183✔
2308
}
2309

2310
int
2311
box_set_replication_synchro_timeout(void)
5,120✔
2312
{
2313
        double value = box_check_replication_synchro_timeout();
5,120✔
2314
        if (value < 0)
5,120✔
2315
                return -1;
4✔
2316
        replication_synchro_timeout = value;
5,116✔
2317
        txn_limbo_on_parameters_change(&txn_limbo);
5,116✔
2318
        return 0;
5,116✔
2319
}
2320

2321
void
2322
box_set_replication_sync_timeout(void)
4,831✔
2323
{
2324
        replication_sync_timeout = box_check_replication_sync_timeout();
4,831✔
2325
}
4,831✔
2326

2327
void
2328
box_set_replication_skip_conflict(void)
4,724✔
2329
{
2330
        replication_skip_conflict = cfg_geti("replication_skip_conflict");
4,724✔
2331
}
4,724✔
2332

2333
/** Register on the master instance. Could be initial join or a name change. */
2334
static void
2335
box_register_on_master(void)
22✔
2336
{
2337
        /*
2338
         * Restart the appliers so as they would notice the change (need an ID,
2339
         * need a new name).
2340
         */
2341
        box_restart_replication();
22✔
2342
        struct replica *master = replicaset_find_join_master();
22✔
2343
        if (master == NULL || master->applier == NULL ||
22!
2344
            master->applier->state != APPLIER_CONNECTED) {
19!
2345
                tnt_raise(ClientError, ER_CANNOT_REGISTER);
3!
2346
        } else {
2347
                struct applier *master_applier = master->applier;
19✔
2348
                applier_resume_to_state(master_applier, APPLIER_REGISTERED,
19✔
2349
                                        TIMEOUT_INFINITY);
2350
                applier_resume_to_state(master_applier, APPLIER_READY,
17✔
2351
                                        TIMEOUT_INFINITY);
2352
        }
2353
        replicaset_follow();
17✔
2354
        replicaset_sync();
17✔
2355
}
17✔
2356

2357
void
2358
box_set_replication_anon(void)
355✔
2359
{
2360
        assert(is_box_configured);
355!
2361
        assert(cfg_replication_anon == box_is_anon());
355!
2362
        bool new_anon = box_check_replication_anon();
355✔
2363
        if (new_anon == cfg_replication_anon)
352✔
2364
                return;
342✔
2365
        auto guard = make_scoped_guard([&]{
4✔
2366
                cfg_replication_anon = !new_anon;
4✔
2367
                box_broadcast_ballot();
4✔
2368
        });
14!
2369
        cfg_replication_anon = new_anon;
10✔
2370
        box_broadcast_ballot();
10!
2371
        if (!new_anon) {
10✔
2372
                box_register_on_master();
9✔
2373
                assert(!box_is_anon());
6!
2374
        } else {
2375
                /*
2376
                 * It is forbidden to turn a normal replica into
2377
                 * an anonymous one.
2378
                 */
2379
                tnt_raise(ClientError, ER_CFG, "replication_anon",
1!
2380
                          "cannot be turned on after bootstrap"
2381
                          " has finished");
2382
        }
2383
        guard.is_active = false;
6✔
2384
}
2385

2386
int
2387
box_set_replication_anon_ttl(void)
4,719✔
2388
{
2389
        double ttl = box_check_replication_anon_ttl();
4,719✔
2390
        if (ttl <= 0)
4,719!
2391
                return -1;
×
2392
        replication_anon_ttl = ttl;
4,719✔
2393
        /* The fiber can be NULL on configuration. */
2394
        if (replication_anon_gc_fiber != NULL)
4,719!
2395
                fiber_wakeup(replication_anon_gc_fiber);
4,719✔
2396
        return 0;
4,719✔
2397
}
2398

2399
/**
2400
 * Set the cluster name record in _schema, bypassing all checks like whether the
2401
 * instance is writable. It makes the function usable by bootstrap master when
2402
 * it is read-only but has to make the first registration.
2403
 */
2404
static void
2405
box_set_cluster_name_record(const char *name)
2,905✔
2406
{
2407
        int rc;
2408
        if (*name == 0) {
2,905✔
2409
                rc = boxk(IPROTO_DELETE, BOX_SCHEMA_ID, "[%s]", "cluster_name");
2,876✔
2410
        } else {
2411
                rc = boxk(IPROTO_REPLACE, BOX_SCHEMA_ID, "[%s%s]",
29✔
2412
                          "cluster_name", name);
2413
        }
2414
        if (rc != 0)
2,905✔
2415
                diag_raise();
2✔
2416
}
2,903✔
2417

2418
void
2419
box_set_cluster_name(void)
57✔
2420
{
2421
        char name[NODE_NAME_SIZE_MAX];
2422
        if (box_check_cluster_name(name) != 0)
57!
2423
                diag_raise();
6✔
2424
        /* Nil means the config doesn't care, allows to use any name. */
2425
        if (*name == 0)
51✔
2426
                return;
21✔
2427
        if (strcmp(CLUSTER_NAME, name) == 0)
41✔
2428
                return;
11✔
2429
        box_check_writable_xc();
30✔
2430
        box_set_cluster_name_record(name);
28✔
2431
}
2432

2433
/**
2434
 * Set the new replicaset name record in _schema, bypassing all checks like
2435
 * whether the instance is writable. It makes the function usable by bootstrap
2436
 * master when it is read-only but has to make the first registration.
2437
 */
2438
static void
2439
box_set_replicaset_name_record(const char *name)
2,909✔
2440
{
2441
        int rc;
2442
        if (*name == 0) {
2,909✔
2443
                rc = boxk(IPROTO_DELETE, BOX_SCHEMA_ID, "[%s]",
2,527✔
2444
                          "replicaset_name");
2445
        } else {
2446
                rc = boxk(IPROTO_REPLACE, BOX_SCHEMA_ID, "[%s%s]",
382✔
2447
                          "replicaset_name", name);
2448
        }
2449
        if (rc != 0)
2,909✔
2450
                diag_raise();
2✔
2451
}
2,907✔
2452

2453
void
2454
box_set_replicaset_name(void)
408✔
2455
{
2456
        char name[NODE_NAME_SIZE_MAX];
2457
        if (box_check_replicaset_name(name) != 0)
408!
2458
                diag_raise();
6✔
2459
        /* Nil means the config doesn't care, allows to use any name. */
2460
        if (*name == 0)
402✔
2461
                return;
368✔
2462
        if (strcmp(REPLICASET_NAME, name) == 0)
390✔
2463
                return;
356✔
2464
        box_check_writable_xc();
34✔
2465
        box_set_replicaset_name_record(name);
32✔
2466
}
2467

2468
/**
2469
 * Register a new replica if not already registered. Update its name if needed.
2470
 */
2471
static void
2472
box_register_replica(const struct tt_uuid *uuid,
2473
                     const char *name);
2474

2475
void
2476
box_set_instance_name(void)
381✔
2477
{
2478
        char name[NODE_NAME_SIZE_MAX];
2479
        if (box_check_instance_name(name) != 0)
381!
2480
                diag_raise();
7✔
2481
        if (strcmp(cfg_instance_name, name) == 0)
374✔
2482
                return;
346✔
2483
        /**
2484
         * It's possible, that the name is set on master by the manual replace.
2485
         * Don't make all appliers to resubscribe in such case. Just update
2486
         * the saved cfg_instance_name.
2487
         */
2488
        if (strcmp(INSTANCE_NAME, name) == 0) {
46✔
2489
                strlcpy(cfg_instance_name, name, NODE_NAME_SIZE_MAX);
18!
2490
                return;
18✔
2491
        }
2492
        char old_cfg_name[NODE_NAME_SIZE_MAX];
2493
        strlcpy(old_cfg_name, cfg_instance_name, NODE_NAME_SIZE_MAX);
28!
2494
        auto guard = make_scoped_guard([&]{
4✔
2495
                strlcpy(cfg_instance_name, old_cfg_name, NODE_NAME_SIZE_MAX);
4✔
2496
                try {
2497
                        box_restart_replication();
4!
2498
                        replicaset_follow();
4!
2499
                } catch (Exception *exc) {
×
2500
                        exc->log();
×
2501
                } catch (...) {
×
2502
                        panic("Unknown exception on instance name set failure");
×
2503
                }
2504
        });
36!
2505
        strlcpy(cfg_instance_name, name, NODE_NAME_SIZE_MAX);
28!
2506
        /* Nil means the config doesn't care, allows to use any name. */
2507
        if (*name != 0) {
28✔
2508
                if (box_is_ro())
24✔
2509
                        box_register_on_master();
13✔
2510
                else
2511
                        box_register_replica(&INSTANCE_UUID, name);
11✔
2512
        }
2513
        guard.is_active = false;
24✔
2514
}
2515

2516
/** Trigger to catch ACKs from all nodes when need to wait for quorum. */
2517
struct box_quorum_trigger {
2518
        /** Inherit trigger. */
2519
        struct trigger base;
2520
        /** Minimal number of nodes who should confirm the target LSN. */
2521
        int quorum;
2522
        /** Target LSN to wait for. */
2523
        int64_t target_lsn;
2524
        /** Replica ID whose LSN is being waited. */
2525
        uint32_t replica_id;
2526
        /**
2527
         * All versions of the given replica's LSN as seen by other nodes. The
2528
         * same as in the txn limbo.
2529
         */
2530
        struct vclock vclock;
2531
        /** Number of nodes who confirmed the LSN. */
2532
        int ack_count;
2533
        /** Fiber to wakeup when quorum is reached. */
2534
        struct fiber *waiter;
2535
};
2536

2537
static int
2538
box_quorum_on_ack_f(struct trigger *trigger, void *event)
4✔
2539
{
2540
        struct replication_ack *ack = (struct replication_ack *)event;
4✔
2541
        struct box_quorum_trigger *t = (struct box_quorum_trigger *)trigger;
4✔
2542
        int64_t new_lsn = vclock_get(ack->vclock, t->replica_id);
4✔
2543
        int64_t old_lsn = vclock_get(&t->vclock, ack->source);
4✔
2544
        if (new_lsn < t->target_lsn || old_lsn >= t->target_lsn)
4!
2545
                return 0;
×
2546

2547
        vclock_follow(&t->vclock, ack->source, new_lsn);
4✔
2548
        ++t->ack_count;
4✔
2549
        if (t->ack_count >= t->quorum) {
4!
2550
                fiber_wakeup(t->waiter);
4✔
2551
                trigger_clear(trigger);
4✔
2552
        }
2553
        return 0;
4✔
2554
}
2555

2556
/**
2557
 * Wait until at least @a quorum of nodes confirm @a target_lsn from the node
2558
 * with id @a lead_id.
2559
 */
2560
static int
2561
box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum,
48✔
2562
                double timeout)
2563
{
2564
#ifndef NDEBUG
2565
    ++errinj(ERRINJ_WAIT_QUORUM_COUNT, ERRINJ_INT)->iparam;
48!
2566
#endif
2567

2568
        struct box_quorum_trigger t;
2569
        memset(&t, 0, sizeof(t));
48✔
2570
        vclock_create(&t.vclock);
48✔
2571

2572
        /* Take this node into account immediately. */
2573
        int ack_count = vclock_get(box_vclock, lead_id) >= target_lsn;
48✔
2574
        replicaset_foreach(replica) {
170!
2575
                if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
164!
2576
                    replica->anon)
42!
2577
                        continue;
80✔
2578

2579
                assert(replica->id != REPLICA_ID_NIL);
42!
2580
                assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
42!
2581

2582
                int64_t lsn = vclock_get(relay_vclock(replica->relay), lead_id);
42!
2583
                /*
2584
                 * The replica might not yet received anything from the old
2585
                 * leader. Easily can happen with a newly added replica. Vclock
2586
                 * can't be followed then because would assert on lsn > old lsn
2587
                 * whereas they are both 0.
2588
                 */
2589
                if (lsn == 0)
42✔
2590
                        continue;
1✔
2591
                vclock_follow(&t.vclock, replica->id, lsn);
41!
2592
                if (lsn >= target_lsn) {
41✔
2593
                        ack_count++;
33✔
2594
                        continue;
33✔
2595
                }
2596
        }
2597
        if (ack_count < quorum) {
48✔
2598
                t.quorum = quorum;
6✔
2599
                t.target_lsn = target_lsn;
6✔
2600
                t.replica_id = lead_id;
6✔
2601
                t.ack_count = ack_count;
6✔
2602
                t.waiter = fiber();
6!
2603
                trigger_create(&t.base, box_quorum_on_ack_f, NULL, NULL);
6✔
2604
                trigger_add(&replicaset.on_ack, &t.base);
6✔
2605
                double deadline = ev_monotonic_now(loop()) + timeout;
6!
2606
                do {
×
2607
                        if (fiber_yield_deadline(deadline))
6!
2608
                                break;
2✔
2609
                } while (!fiber_is_cancelled() && t.ack_count < t.quorum);
4!
2610
                trigger_clear(&t.base);
6!
2611
                ack_count = t.ack_count;
6✔
2612
        }
2613
        /*
2614
         * No point to proceed after cancellation even if got the quorum. The
2615
         * quorum is waited by limbo clear function. Emptying the limbo involves
2616
         * a pair of blocking WAL writes, making the fiber sleep even longer,
2617
         * which isn't appropriate when it's canceled.
2618
         */
2619
        if (fiber_is_cancelled()) {
48!
2620
                diag_set(FiberIsCancelled);
×
2621
                return -1;
×
2622
        }
2623
        if (ack_count < quorum) {
48✔
2624
                diag_set(TimedOut);
2!
2625
                return -1;
2✔
2626
        }
2627
        return 0;
46✔
2628
}
2629

2630
/**
2631
 * The pool is used by box_cc to allocate sync_trigger_data that is used in
2632
 * box_collect_confirmed_vclock and relay_get_sync_on_start. We allocate
2633
 * sync_trigger_data dynamically because these functions are running in
2634
 * different fibers. The lifetime of sync_trigger_data is not limited by the
2635
 * execution time of box_collect_confirmed_vclock.
2636
 */
2637
static struct mempool sync_trigger_data_pool;
2638

2639
/** A structure holding trigger data to collect syncs. */
2640
struct sync_trigger_data {
2641
        /** Syncs to wait for. */
2642
        uint64_t vclock_syncs[VCLOCK_MAX];
2643
        /**
2644
         * A bitmap holding replica ids whose vclocks were already collected.
2645
         */
2646
        vclock_map_t collected_vclock_map;
2647
        /** The fiber waiting for vclock. */
2648
        struct fiber *waiter;
2649
        /** Collected vclock. */
2650
        struct vclock *vclock;
2651
        /** The request deadline. */
2652
        double deadline;
2653
        /** How many vclocks are needed. */
2654
        int count;
2655
        /** Whether the request is timed out. */
2656
        bool is_timed_out;
2657
        /** Count of fibers that are using data. */
2658
        int ref_count;
2659
};
2660

2661
/** Let others know we need data. */
2662
void
2663
sync_trigger_data_ref(struct sync_trigger_data *data)
7✔
2664
{
2665
        ++data->ref_count;
7✔
2666
}
7✔
2667

2668
/**
2669
 * Let others know that we no longer need the data.
2670
 * If no one else needs the data, free it.
2671
 */
2672
void
2673
sync_trigger_data_unref(struct sync_trigger_data *data)
7✔
2674
{
2675
        --data->ref_count;
7✔
2676
        assert(data->ref_count >= 0);
7!
2677
        if (data->ref_count == 0)
7✔
2678
                mempool_free(&sync_trigger_data_pool, data);
6✔
2679
}
7✔
2680

2681
/**
2682
 * A trigger executed on each ack to collect up to date remote node vclocks.
2683
 * When an ack comes with requested sync for some replica id, ack vclock is
2684
 * accounted.
2685
 */
2686
static int
2687
check_vclock_sync_on_ack(struct trigger *trigger, void *event)
21✔
2688
{
2689
        struct replication_ack *ack = (struct replication_ack *)event;
21✔
2690
        struct sync_trigger_data *data =
21✔
2691
                (struct sync_trigger_data *)trigger->data;
2692
        uint32_t id = ack->source;
21✔
2693
        /*
2694
         * Anonymous replica acks are not counted for synchronous transactions,
2695
         * so linearizable read shouldn't count them as well.
2696
         */
2697
        if (id == 0)
21!
2698
                return 0;
×
2699
        uint64_t sync = data->vclock_syncs[id];
21✔
2700
        int accounted_count = bit_count_u32(data->collected_vclock_map);
21✔
2701
        if (!bit_test(&data->collected_vclock_map, id) && sync > 0 &&
42✔
2702
            ack->vclock_sync >= sync && accounted_count < data->count) {
42!
2703
                vclock_max_ignore0(data->vclock, ack->vclock);
7✔
2704
                bit_set(&data->collected_vclock_map, id);
7✔
2705
                ++accounted_count;
7✔
2706
                if (accounted_count >= data->count)
7✔
2707
                        fiber_wakeup(data->waiter);
5✔
2708
        }
2709
        return 0;
21✔
2710
}
2711

2712
/** A trigger querying relay's next sync value once it becomes operational. */
2713
static int
2714
relay_get_sync_on_start(struct trigger *trigger, void *event)
1✔
2715
{
2716
        struct replica *replica = (struct replica *)event;
1✔
2717
        if (replica->anon)
1!
2718
                return 0;
×
2719
        struct relay *relay = replica->relay;
1✔
2720
        struct sync_trigger_data *data =
1✔
2721
                (struct sync_trigger_data *)trigger->data;
2722
        uint32_t id = replica->id;
1✔
2723
        /* Already accounted. */
2724
        if (bit_test(&data->collected_vclock_map, id))
1!
2725
                return 0;
×
2726

2727
        sync_trigger_data_ref(data);
1✔
2728
        if (relay_trigger_vclock_sync(relay, &data->vclock_syncs[id],
1✔
2729
                                      data->deadline) != 0) {
1!
2730
                diag_clear(diag_get());
×
2731
                data->is_timed_out = true;
×
2732
                fiber_wakeup(data->waiter);
×
2733
        }
2734
        sync_trigger_data_unref(data);
1✔
2735
        return 0;
1✔
2736
}
2737

2738
/** Find the minimal vclock which has all the data confirmed on a quorum. */
2739
static int
2740
box_collect_confirmed_vclock(struct vclock *confirmed_vclock, double deadline)
16✔
2741
{
2742
        /*
2743
         * How many vclocks we should see to be sure that at least one of them
2744
         * contains all data present on any real quorum.
2745
         */
2746
        int vclock_count = MAX(1, replicaset.registered_count) -
16✔
2747
                           replication_synchro_quorum + 1;
2748
        /*
2749
         * We should check the vclock on self plus vclock_count - 1 remote
2750
         * instances.
2751
         */
2752
        vclock_copy(confirmed_vclock, instance_vclock);
16✔
2753
        if (vclock_count <= 1)
16✔
2754
                return 0;
10✔
2755

2756
        struct sync_trigger_data *data = (sync_trigger_data *)
2757
                xmempool_alloc(&sync_trigger_data_pool);
6!
2758
        memset(data->vclock_syncs, 0, sizeof(data->vclock_syncs));
6✔
2759
        data->collected_vclock_map = 0;
6✔
2760
        data->waiter = fiber();
6!
2761
        data->vclock = confirmed_vclock;
6✔
2762
        data->deadline = deadline;
6✔
2763
        data->count = vclock_count;
6✔
2764
        data->is_timed_out = false;
6✔
2765
        data->ref_count = 0;
6✔
2766

2767
        sync_trigger_data_ref(data);
6!
2768
        bit_set(&data->collected_vclock_map, instance_id);
6✔
2769
        struct trigger on_relay_thread_start;
2770
        trigger_create(&on_relay_thread_start, relay_get_sync_on_start, data,
6✔
2771
                       NULL);
2772
        trigger_add(&replicaset.on_relay_thread_start, &on_relay_thread_start);
6✔
2773
        struct trigger on_ack;
2774
        trigger_create(&on_ack, check_vclock_sync_on_ack, data, NULL);
6✔
2775
        trigger_add(&replicaset.on_ack, &on_ack);
6✔
2776

2777
        auto guard = make_scoped_guard([&] {
6✔
2778
                trigger_clear(&on_ack);
6✔
2779
                trigger_clear(&on_relay_thread_start);
6✔
2780
                sync_trigger_data_unref(data);
6✔
2781
        });
12!
2782

2783
        replicaset_foreach(replica) {
26!
2784
                if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
32!
2785
                    replica->anon) {
11!
2786
                        continue;
10✔
2787
                }
2788
                /* Might be already filled by on_relay_thread_start trigger. */
2789
                if (data->vclock_syncs[replica->id] != 0)
11!
2790
                        continue;
×
2791
                if (relay_trigger_vclock_sync(replica->relay,
22✔
2792
                                              &data->vclock_syncs[replica->id],
11!
2793
                                              deadline) != 0) {
11✔
2794
                        /* Timed out. */
2795
                        return -1;
1✔
2796
                }
2797
        }
2798

2799
        while (bit_count_u32(data->collected_vclock_map) < vclock_count &&
10✔
2800
               !data->is_timed_out && !fiber_is_cancelled()) {
10!
2801
                if (fiber_yield_deadline(deadline))
5!
2802
                        break;
×
2803
        }
2804

2805
        if (fiber_is_cancelled()) {
5!
2806
                diag_set(FiberIsCancelled);
×
2807
                return -1;
×
2808
        }
2809
        if (bit_count_u32(data->collected_vclock_map) < vclock_count) {
5!
2810
                diag_set(TimedOut);
×
2811
                return -1;
×
2812
        }
2813
        return 0;
5✔
2814
}
2815

2816
/** box_wait_vclock trigger data. */
2817
struct box_wait_vclock_data {
2818
        /** Whether the request is finished. */
2819
        bool is_ready;
2820
        /** The vclock to wait for. */
2821
        const struct vclock *vclock;
2822
        /** The fiber waiting for vclock. */
2823
        struct fiber *waiter;
2824
};
2825

2826
static int
2827
box_wait_vclock_f(struct trigger *trigger, void *event)
×
2828
{
2829
        (void)event;
2830
        struct box_wait_vclock_data *data =
×
2831
                (struct box_wait_vclock_data *)trigger->data;
2832
        if (vclock_compare_ignore0(data->vclock, instance_vclock) <= 0) {
×
2833
                data->is_ready = true;
×
2834
                fiber_wakeup(data->waiter);
×
2835
        }
2836
        return 0;
×
2837
}
2838

2839
/**
2840
 * Wait until this instance's vclock reaches @a vclock or @a deadline is
2841
 * reached.
2842
 */
2843
static int
2844
box_wait_vclock(const struct vclock *vclock, double deadline)
15✔
2845
{
2846
        if (vclock_compare_ignore0(vclock, instance_vclock) <= 0)
15!
2847
                return 0;
15✔
2848
        struct trigger on_wal_write;
2849
        struct box_wait_vclock_data data = {
×
2850
                .is_ready = false,
2851
                .vclock = vclock,
2852
                .waiter = fiber(),
×
2853
        };
×
2854
        trigger_create(&on_wal_write, box_wait_vclock_f, &data, NULL);
×
2855
        trigger_add(&wal_on_write, &on_wal_write);
×
2856
        do {
×
2857
                if (fiber_yield_deadline(deadline))
×
2858
                        break;
×
2859
        } while (!data.is_ready && !fiber_is_cancelled());
×
2860
        trigger_clear(&on_wal_write);
×
2861
        if (fiber_is_cancelled()) {
×
2862
                diag_set(FiberIsCancelled);
×
2863
                return -1;
×
2864
        }
2865
        if (!data.is_ready) {
×
2866
                diag_set(TimedOut);
×
2867
                return -1;
×
2868
        }
2869
        return 0;
×
2870
}
2871

2872
int
2873
box_wait_linearization_point(double timeout)
16✔
2874
{
2875
        double deadline = ev_monotonic_now(loop()) + timeout;
16!
2876
        struct vclock confirmed_vclock;
2877
        vclock_create(&confirmed_vclock);
16✔
2878
        /*
2879
         * First find out the vclock which might be confirmed on remote
2880
         * instances.
2881
         */
2882
        if (box_collect_confirmed_vclock(&confirmed_vclock, deadline) != 0)
16!
2883
                return -1;
1✔
2884
        /* Then wait until all the rows up to this vclock are received. */
2885
        if (box_wait_vclock(&confirmed_vclock, deadline) != 0)
15!
2886
                return -1;
×
2887
        /*
2888
         * Finally, wait until all the synchronous transactions, which should be
2889
         * visible to this tx, become visible.
2890
         */
2891
        bool is_rollback;
2892
        timeout = deadline - ev_monotonic_now(loop());
15!
2893
        if (!txn_limbo_is_empty(&txn_limbo) &&
16✔
2894
            txn_limbo_wait_last_txn(&txn_limbo, &is_rollback, timeout) != 0)
1!
2895
                return -1;
1✔
2896
        return 0;
14✔
2897
}
2898

2899
/**
2900
 * Check whether the greatest promote term has changed since it was last read.
2901
 * IOW check that a foreign PROMOTE arrived while we were sleeping.
2902
 */
2903
static int
2904
box_check_promote_term_intact(uint64_t promote_term)
1,169✔
2905
{
2906
        if (txn_limbo.promote_greatest_term != promote_term) {
1,169✔
2907
                diag_set(ClientError, ER_INTERFERING_PROMOTE,
6!
2908
                         txn_limbo.owner_id);
2909
                return -1;
6✔
2910
        }
2911
        return 0;
1,163✔
2912
}
2913

2914
/**
2915
 * Check whether the raft term has changed since it was last read.
2916
 */
2917
static int
2918
box_check_election_term_intact(uint64_t term)
966✔
2919
{
2920
        if (box_raft()->volatile_term != term) {
966✔
2921
                diag_set(ClientError, ER_INTERFERING_ELECTIONS);
4!
2922
                return -1;
4✔
2923
        }
2924
        return 0;
962✔
2925
}
2926

2927
/** Trigger a new election round but don't wait for its result. */
2928
static int
2929
box_trigger_elections(void)
318✔
2930
{
2931
        uint64_t promote_term = txn_limbo.promote_greatest_term;
318✔
2932
        raft_new_term(box_raft());
318✔
2933
        if (box_raft_wait_term_persisted() < 0)
318!
2934
                return -1;
×
2935
        return box_check_promote_term_intact(promote_term);
318✔
2936
}
2937

2938
/** Try waiting until limbo is emptied up to given timeout. */
2939
static int
2940
box_try_wait_confirm(double timeout)
318✔
2941
{
2942
        uint64_t promote_term = txn_limbo.promote_greatest_term;
318✔
2943
        txn_limbo_wait_empty(&txn_limbo, timeout);
318✔
2944
        return box_check_promote_term_intact(promote_term);
318✔
2945
}
2946

2947
/**
2948
 * A helper to wait until all limbo entries are ready to be confirmed, i.e.
2949
 * written to WAL and have gathered a quorum of ACKs from replicas.
2950
 * Return lsn of the last limbo entry on success, -1 on error.
2951
 */
2952
static int64_t
2953
box_wait_limbo_acked(double timeout)
492✔
2954
{
2955
        if (txn_limbo_is_empty(&txn_limbo))
492✔
2956
                return txn_limbo.confirmed_lsn;
440✔
2957

2958
        uint64_t promote_term = txn_limbo.promote_greatest_term;
52✔
2959
        int quorum = replication_synchro_quorum;
52✔
2960
        struct txn_limbo_entry *last_entry;
2961
        last_entry = txn_limbo_last_synchro_entry(&txn_limbo);
52✔
2962
        /* Wait for the last entries WAL write. */
2963
        if (last_entry->lsn < 0) {
52✔
2964
                int64_t tid = last_entry->txn->id;
8✔
2965

2966
                journal_queue_flush();
8✔
2967
                if (wal_sync(NULL) != 0)
8✔
2968
                        return -1;
1✔
2969

2970
                if (box_check_promote_term_intact(promote_term) != 0)
7✔
2971
                        return -1;
1✔
2972
                if (txn_limbo_is_empty(&txn_limbo))
6✔
2973
                        return txn_limbo.confirmed_lsn;
1✔
2974
                if (tid != txn_limbo_last_synchro_entry(&txn_limbo)->txn->id) {
5✔
2975
                        diag_set(ClientError, ER_QUORUM_WAIT, quorum,
1!
2976
                                 "new synchronous transactions appeared");
2977
                        return -1;
1✔
2978
                }
2979
        }
2980
        assert(last_entry->lsn > 0);
48!
2981
        int64_t wait_lsn = last_entry->lsn;
48✔
2982

2983
        if (box_wait_quorum(txn_limbo.owner_id, wait_lsn, quorum, timeout) != 0)
48✔
2984
                return -1;
2✔
2985

2986
        if (box_check_promote_term_intact(promote_term) != 0)
46✔
2987
                return -1;
1✔
2988

2989
        if (txn_limbo_is_empty(&txn_limbo))
45✔
2990
                return txn_limbo.confirmed_lsn;
2✔
2991

2992
        if (quorum < replication_synchro_quorum) {
43✔
2993
                diag_set(ClientError, ER_QUORUM_WAIT, quorum,
1!
2994
                         "quorum was increased while waiting");
2995
                return -1;
1✔
2996
        }
2997
        if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
42✔
2998
                diag_set(ClientError, ER_QUORUM_WAIT, quorum,
1!
2999
                         "new synchronous transactions appeared");
3000
                return -1;
1✔
3001
        }
3002

3003
        return wait_lsn;
41✔
3004
}
3005

3006
/** Write and process a PROMOTE request. */
3007
static int
3008
box_issue_promote(int64_t promote_lsn)
408✔
3009
{
3010
        int rc = 0;
408✔
3011
        uint64_t term = box_raft()->term;
408✔
3012
        uint64_t promote_term = txn_limbo.promote_greatest_term;
408✔
3013
        assert(promote_lsn >= 0);
408!
3014
        rc = box_check_election_term_intact(term);
408✔
3015
        if (rc != 0)
408✔
3016
                return rc;
1✔
3017

3018
        txn_limbo_begin(&txn_limbo);
407✔
3019
        rc = box_check_election_term_intact(term);
407✔
3020
        if (rc != 0)
407✔
3021
                goto end;
1✔
3022
        rc = box_check_promote_term_intact(promote_term);
406✔
3023
        if (rc != 0)
406!
3024
                goto end;
×
3025
        rc = txn_limbo_write_promote(&txn_limbo, promote_lsn, term);
406✔
3026

3027
end:
407✔
3028
        if (rc == 0) {
407✔
3029
                txn_limbo_commit(&txn_limbo);
404✔
3030
                assert(txn_limbo_is_empty(&txn_limbo));
404!
3031
        } else {
3032
                txn_limbo_rollback(&txn_limbo);
3✔
3033
        }
3034
        return rc;
407✔
3035
}
3036

3037
/** A guard to block multiple simultaneous promote()/demote() invocations. */
3038
static bool is_in_box_promote = false;
3039

3040
/** Write and process a DEMOTE request. */
3041
static int
3042
box_issue_demote(int64_t promote_lsn)
76✔
3043
{
3044
        int rc = 0;
76✔
3045
        uint64_t term = box_raft()->term;
76✔
3046
        uint64_t promote_term = txn_limbo.promote_greatest_term;
76✔
3047
        assert(promote_lsn >= 0);
76!
3048

3049
        rc = box_check_election_term_intact(term);
76✔
3050
        if (rc != 0)
76✔
3051
                return rc;
1✔
3052

3053
        txn_limbo_begin(&txn_limbo);
75✔
3054
        rc = box_check_election_term_intact(term);
75✔
3055
        if (rc != 0)
75✔
3056
                goto end;
1✔
3057
        rc = box_check_promote_term_intact(promote_term);
74✔
3058
        if (rc != 0)
74!
3059
                goto end;
×
3060
        rc = txn_limbo_write_demote(&txn_limbo, promote_lsn, term);
74✔
3061

3062
end:
75✔
3063
        if (rc == 0) {
75✔
3064
                txn_limbo_commit(&txn_limbo);
74✔
3065
                assert(txn_limbo_is_empty(&txn_limbo));
74!
3066
        } else {
3067
                txn_limbo_rollback(&txn_limbo);
1✔
3068
        }
3069
        return rc;
75✔
3070
}
3071

3072
int
3073
box_promote_qsync(void)
2,600✔
3074
{
3075
        if (is_in_box_promote) {
2,600✔
3076
                diag_set(ClientError, ER_IN_ANOTHER_PROMOTE);
2,322!
3077
                return -1;
2,322✔
3078
        }
3079
        assert(is_box_configured);
278!
3080
        struct raft *raft = box_raft();
278✔
3081
        is_in_box_promote = true;
278✔
3082
        auto promote_guard = make_scoped_guard([&] {
278✔
3083
                is_in_box_promote = false;
278✔
3084
        });
556!
3085
        assert(raft->state == RAFT_STATE_LEADER);
278!
3086
        if (txn_limbo_replica_term(&txn_limbo, instance_id) == raft->term)
278✔
3087
                return 0;
101✔
3088
        int64_t wait_lsn = box_wait_limbo_acked(TIMEOUT_INFINITY);
177!
3089
        if (wait_lsn < 0)
177!
3090
                return -1;
×
3091
        if (raft->state != RAFT_STATE_LEADER) {
177!
3092
                diag_set(ClientError, ER_NOT_LEADER, raft->leader);
×
3093
                return -1;
×
3094
        }
3095
        return box_issue_promote(wait_lsn);
177!
3096
}
3097

3098
int
3099
box_check_promote(void) {
447✔
3100
        if (is_in_box_promote) {
447✔
3101
                diag_set(ClientError, ER_UNSUPPORTED, "box.ctl.promote/demote",
2!
3102
                         "simultaneous invocations");
3103
                return -1;
2✔
3104
        }
3105
        if (cfg_replication_anon) {
445✔
3106
                diag_set(ClientError, ER_UNSUPPORTED, "replication_anon=true",
2!
3107
                         "manual elections");
3108
                return -1;
2✔
3109
        }
3110
        return 0;
443✔
3111
}
3112

3113
int
3114
box_promote(void)
322✔
3115
{
3116
        if (!is_box_configured)
322✔
3117
                return 0;
1✔
3118
        if (box_check_promote() != 0)
321!
3119
                return -1;
2✔
3120

3121
        struct raft *raft = box_raft();
319✔
3122
        is_in_box_promote = true;
319✔
3123
        auto promote_guard = make_scoped_guard([&] {
319✔
3124
                is_in_box_promote = false;
319✔
3125
        });
638!
3126
        /*
3127
         * Currently active leader (the instance that is seen as leader by both
3128
         * raft and txn_limbo) can't issue another PROMOTE.
3129
         */
3130
        bool is_leader =
3131
                txn_limbo_replica_term(&txn_limbo, instance_id) == raft->term &&
372✔
3132
                txn_limbo_is_owned_by_current_instance(&txn_limbo) &&
372✔
3133
                !txn_limbo.is_frozen_until_promotion;
34✔
3134
        if (box_election_mode != ELECTION_MODE_OFF)
319✔
3135
                is_leader = is_leader && raft->state == RAFT_STATE_LEADER;
75✔
3136

3137
        if (is_leader)
319✔
3138
                return 0;
15✔
3139
        switch (box_election_mode) {
304!
3140
        case ELECTION_MODE_OFF:
241✔
3141
                if (box_try_wait_confirm(2 * replication_synchro_timeout) != 0)
241!
3142
                        return -1;
1✔
3143
                if (box_trigger_elections() != 0)
240!
3144
                        return -1;
1✔
3145
                break;
239✔
3146
        case ELECTION_MODE_VOTER:
1✔
3147
                assert(raft->state == RAFT_STATE_FOLLOWER);
1!
3148
                diag_set(ClientError, ER_UNSUPPORTED, "election_mode='voter'",
1!
3149
                         "manual elections");
3150
                return -1;
1✔
3151
        case ELECTION_MODE_MANUAL:
62✔
3152
        case ELECTION_MODE_CANDIDATE:
3153
                if (raft->state == RAFT_STATE_LEADER)
62✔
3154
                        return 0;
1✔
3155
                is_in_box_promote = false;
61✔
3156
                return box_raft_try_promote();
61!
3157
        default:
×
3158
                unreachable();
×
3159
        }
3160

3161
        int64_t wait_lsn = box_wait_limbo_acked(replication_synchro_timeout);
239!
3162
        if (wait_lsn < 0)
239✔
3163
                return -1;
8✔
3164

3165
        return box_issue_promote(wait_lsn);
231!
3166
}
3167

3168
int
3169
box_demote(void)
127✔
3170
{
3171
        if (!is_box_configured)
127✔
3172
                return 0;
1✔
3173
        if (box_check_promote() != 0)
126!
3174
                return -1;
2✔
3175

3176
        is_in_box_promote = true;
124✔
3177
        auto promote_guard = make_scoped_guard([&] {
124✔
3178
                is_in_box_promote = false;
124✔
3179
        });
248!
3180

3181
        const struct raft *raft = box_raft();
124✔
3182
        if (box_election_mode != ELECTION_MODE_OFF) {
124✔
3183
                if (txn_limbo_replica_term(&txn_limbo, instance_id) !=
8✔
3184
                    raft->term)
8!
3185
                        return 0;
×
3186
                if (!txn_limbo_is_owned_by_current_instance(&txn_limbo))
8✔
3187
                        return 0;
1✔
3188
                box_raft_leader_step_off();
7!
3189
                return 0;
7✔
3190
        }
3191

3192
        assert(raft->state == RAFT_STATE_FOLLOWER);
116!
3193
        if (raft->leader != REPLICA_ID_NIL) {
116✔
3194
                diag_set(ClientError, ER_NOT_LEADER, raft->leader);
1!
3195
                return -1;
1✔
3196
        }
3197
        if (txn_limbo.owner_id == REPLICA_ID_NIL)
115✔
3198
                return 0;
34✔
3199
        /*
3200
         * If the limbo term is up to date with Raft, then it might have
3201
         * a valid owner right now. Demotion would disrupt it. In this
3202
         * case the user has to explicitly overthrow the old owner with
3203
         * local promote(), or call demote() on the actual owner.
3204
         */
3205
        if (txn_limbo.promote_greatest_term == raft->term &&
161✔
3206
            !txn_limbo_is_owned_by_current_instance(&txn_limbo))
80✔
3207
                return 0;
3✔
3208
        if (box_trigger_elections() != 0)
78!
3209
                return -1;
1✔
3210
        if (box_try_wait_confirm(2 * replication_synchro_timeout) < 0)
77!
3211
                return -1;
1✔
3212
        int64_t wait_lsn = box_wait_limbo_acked(replication_synchro_timeout);
76!
3213
        if (wait_lsn < 0)
76!
3214
                return -1;
×
3215
        return box_issue_demote(wait_lsn);
76!
3216
}
3217

3218
int
3219
box_listen(void)
4,967✔
3220
{
3221
        struct uri_set uri_set;
3222
        if (box_check_listen(&uri_set) != 0)
4,967!
3223
                return -1;
56✔
3224
        int rc = iproto_listen(&uri_set);
4,911!
3225
        uri_set_destroy(&uri_set);
4,911!
3226
        return rc;
4,911✔
3227
}
3228

3229
void
3230
box_set_io_collect_interval(void)
344✔
3231
{
3232
        ev_set_io_collect_interval(loop(), cfg_getd("io_collect_interval"));
344!
3233
}
344✔
3234

3235
void
3236
box_set_snap_io_rate_limit(void)
377✔
3237
{
3238
        struct memtx_engine *memtx;
3239
        memtx = (struct memtx_engine *)engine_by_name("memtx");
377✔
3240
        assert(memtx != NULL);
377!
3241
        memtx_engine_set_snap_io_rate_limit(memtx,
377✔
3242
                        cfg_getd("snap_io_rate_limit"));
3243
        struct engine *vinyl = engine_by_name("vinyl");
377✔
3244
        assert(vinyl != NULL);
377!
3245
        vinyl_engine_set_snap_io_rate_limit(vinyl,
377✔
3246
                        cfg_getd("snap_io_rate_limit"));
3247
}
377✔
3248

3249
void
3250
box_set_memtx_memory(void)
360✔
3251
{
3252
        struct memtx_engine *memtx;
3253
        memtx = (struct memtx_engine *)engine_by_name("memtx");
360✔
3254
        assert(memtx != NULL);
360!
3255
        ssize_t size = box_check_memory_quota("memtx_memory");
360✔
3256
        if (size < 0)
360✔
3257
                diag_raise();
2✔
3258
        memtx_engine_set_memory_xc(memtx, size);
358✔
3259
}
353✔
3260

3261
void
3262
box_set_memtx_max_tuple_size(void)
4,825✔
3263
{
3264
        struct memtx_engine *memtx;
3265
        memtx = (struct memtx_engine *)engine_by_name("memtx");
4,825✔
3266
        assert(memtx != NULL);
4,825!
3267
        memtx_engine_set_max_tuple_size(memtx,
4,825✔
3268
                        cfg_geti("memtx_max_tuple_size"));
4,825✔
3269
}
4,825✔
3270

3271
void
3272
box_set_too_long_threshold(void)
4,825✔
3273
{
3274
        too_long_threshold = cfg_getd("too_long_threshold");
4,825✔
3275

3276
        struct engine *vinyl = engine_by_name("vinyl");
4,825✔
3277
        assert(vinyl != NULL);
4,825!
3278
        vinyl_engine_set_too_long_threshold(vinyl, too_long_threshold);
4,825✔
3279
}
4,825✔
3280

3281
void
3282
box_set_readahead(void)
4,824✔
3283
{
3284
        int readahead = cfg_geti("readahead");
4,824✔
3285
        box_check_readahead(readahead);
4,824✔
3286
        iproto_readahead = readahead;
4,824✔
3287
}
4,824✔
3288

3289
void
3290
box_set_checkpoint_count(void)
4,766✔
3291
{
3292
        int checkpoint_count = cfg_geti("checkpoint_count");
4,766✔
3293
        box_check_checkpoint_count(checkpoint_count);
4,766✔
3294
        gc_set_min_checkpoint_count(checkpoint_count);
4,765✔
3295
}
4,765✔
3296

3297
void
3298
box_set_checkpoint_interval(void)
4,722✔
3299
{
3300
        double interval = cfg_getd("checkpoint_interval");
4,722✔
3301
        gc_set_checkpoint_interval(interval);
4,722✔
3302
}
4,722✔
3303

3304
void
3305
box_set_checkpoint_wal_threshold(void)
4,715✔
3306
{
3307
        int64_t threshold = cfg_geti64("checkpoint_wal_threshold");
4,715✔
3308
        wal_set_checkpoint_threshold(threshold);
4,715✔
3309
}
4,715✔
3310

3311
int
3312
box_set_wal_queue_max_size(void)
4,858✔
3313
{
3314
        int64_t size = box_check_wal_queue_max_size();
4,858✔
3315
        if (size < 0)
4,858!
3316
                return -1;
×
3317
        wal_set_queue_max_size(size);
4,858✔
3318
        return 0;
4,858✔
3319
}
3320

3321
int
3322
box_set_replication_synchro_queue_max_size(void)
8,435✔
3323
{
3324
        int64_t size = box_check_replication_synchro_queue_max_size();
8,435✔
3325
        if (size < 0)
8,435!
3326
                return -1;
×
3327
        if (size != INT64_MAX && recovery_state != FINISHED_RECOVERY) {
8,435✔
3328
                say_info("The option replication_synchro_queue_max_size will "
4,474✔
3329
                         "actually take effect after the recovery is finished");
3330
                txn_limbo_set_max_size(&txn_limbo, INT64_MAX);
4,474✔
3331
                return 0;
4,474✔
3332
        }
3333
        txn_limbo_set_max_size(&txn_limbo, size);
3,961✔
3334
        return 0;
3,961✔
3335
}
3336

3337
int
3338
box_set_wal_cleanup_delay(void)
83✔
3339
{
3340
        double delay = box_check_wal_cleanup_delay();
83✔
3341
        if (delay < 0)
83!
3342
                return -1;
×
3343
        /*
3344
         * Anonymous replicas do not require
3345
         * delay since they can't be a source
3346
         * of replication.
3347
         */
3348
        if (box_is_anon())
83!
3349
                delay = 0;
×
3350
        gc_set_wal_cleanup_delay(delay);
83✔
3351
        return 0;
83✔
3352
}
3353

3354
int
3355
box_set_wal_retention_period(void)
×
3356
{
3357
        double delay = box_check_wal_retention_period();
×
3358
        if (delay < 0)
×
3359
                return -1;
×
3360
        wal_set_retention_period(delay);
×
3361
        return 0;
×
3362
}
3363

3364
void
3365
box_set_vinyl_memory(void)
348✔
3366
{
3367
        struct engine *vinyl = engine_by_name("vinyl");
348✔
3368
        assert(vinyl != NULL);
348!
3369
        ssize_t size = box_check_memory_quota("vinyl_memory");
348✔
3370
        if (size < 0)
348✔
3371
                diag_raise();
2✔
3372
        vinyl_engine_set_memory_xc(vinyl, size);
346✔
3373
}
345✔
3374

3375
void
3376
box_set_vinyl_max_tuple_size(void)
4,831✔
3377
{
3378
        struct engine *vinyl = engine_by_name("vinyl");
4,831✔
3379
        assert(vinyl != NULL);
4,831!
3380
        vinyl_engine_set_max_tuple_size(vinyl,
4,831✔
3381
                        cfg_geti("vinyl_max_tuple_size"));
4,831✔
3382
}
4,831✔
3383

3384
void
3385
box_set_vinyl_cache(void)
4,846✔
3386
{
3387
        struct engine *vinyl = engine_by_name("vinyl");
4,846✔
3388
        assert(vinyl != NULL);
4,846!
3389
        vinyl_engine_set_cache(vinyl, cfg_geti64("vinyl_cache"));
4,846✔
3390
}
4,846✔
3391

3392
void
3393
box_set_vinyl_timeout(void)
4,824✔
3394
{
3395
        struct engine *vinyl = engine_by_name("vinyl");
4,824✔
3396
        assert(vinyl != NULL);
4,824!
3397
        vinyl_engine_set_timeout(vinyl,        cfg_getd("vinyl_timeout"));
4,824✔
3398
}
4,824✔
3399

3400
void
3401
box_set_force_recovery(void)
4,631✔
3402
{
3403
        box_is_force_recovery = cfg_geti("force_recovery");
4,631✔
3404
}
4,631✔
3405

3406
void
3407
box_set_net_msg_max(void)
4,832✔
3408
{
3409
        int new_iproto_msg_max = cfg_geti("net_msg_max");
4,832✔
3410
        if (iproto_set_msg_max(new_iproto_msg_max) != 0)
4,832✔
3411
                diag_raise();
1✔
3412
        fiber_pool_set_max_size(&tx_fiber_pool,
4,831✔
3413
                                new_iproto_msg_max *
3414
                                IPROTO_FIBER_POOL_SIZE_FACTOR);
3415
}
4,831✔
3416

3417
int
3418
box_set_prepared_stmt_cache_size(void)
9,208✔
3419
{
3420
        int cache_sz = cfg_geti("sql_cache_size");
9,208✔
3421
        if (box_check_sql_cache_size(cache_sz) != 0)
9,208!
3422
                return -1;
×
3423
        if (sql_stmt_cache_set_size(cache_sz) != 0)
9,208!
3424
                return -1;
×
3425
        return 0;
9,208✔
3426
}
3427

3428
/**
3429
 * Report crash information to the feedback daemon
3430
 * (ie send it to feedback daemon).
3431
 */
3432
static int
3433
box_feedback_report_crash(struct crash_info *cinfo)
1✔
3434
{
3435
        /*
3436
         * Update to a new number if the format get changed.
3437
         */
3438
        const int crashinfo_version = 1;
1✔
3439

3440
        char *p = (char *)static_alloc(SMALL_STATIC_SIZE);
1✔
3441
        char *tail = &p[SMALL_STATIC_SIZE];
1✔
3442
        char *e = &p[SMALL_STATIC_SIZE];
1✔
3443
        char *head = p;
1✔
3444

3445
        int total = 0;
1✔
3446
        (void)total;
3447
        int size = 0;
1✔
3448

3449
#define snprintf_safe(...) SNPRINT(total, snprintf, p, size, __VA_ARGS__)
3450
#define jnprintf_safe(str) SNPRINT(total, json_escape, p, size, str)
3451

3452
        /*
3453
         * Lets reuse tail of the buffer as a temp space.
3454
         */
3455
        struct utsname *uname_ptr =
1✔
3456
                (struct utsname *)&tail[-sizeof(struct utsname)];
3457
        if (p >= (char *)uname_ptr)
1!
3458
                return -1;
×
3459

3460
        if (uname(uname_ptr) != 0) {
1!
3461
                say_syserror("uname call failed, ignore");
×
3462
                memset(uname_ptr, 0, sizeof(struct utsname));
×
3463
        }
3464

3465
        /*
3466
         * Start filling the script. The "data" key value is
3467
         * filled as a separate code block for easier
3468
         * modifications in future.
3469
         */
3470
        size = (char *)uname_ptr - p;
1✔
3471
        snprintf_safe("{");
1!
3472
        snprintf_safe("\"crashdump\":{");
1!
3473
        snprintf_safe("\"version\":\"%d\",", crashinfo_version);
1!
3474
        snprintf_safe("\"data\":");
1!
3475

3476
        /* The "data" key value */
3477
        snprintf_safe("{");
1!
3478
        snprintf_safe("\"uname\":{");
1!
3479
        snprintf_safe("\"sysname\":\"");
1!
3480
        jnprintf_safe(uname_ptr->sysname);
1!
3481
        snprintf_safe("\",");
1!
3482
        /*
3483
         * nodename might contain a sensitive information, skip.
3484
         */
3485
        snprintf_safe("\"release\":\"");
1!
3486
        jnprintf_safe(uname_ptr->release);
1!
3487
        snprintf_safe("\",");
1!
3488

3489
        snprintf_safe("\"version\":\"");
1!
3490
        jnprintf_safe(uname_ptr->version);
1!
3491
        snprintf_safe("\",");
1!
3492

3493
        snprintf_safe("\"machine\":\"");
1!
3494
        jnprintf_safe(uname_ptr->machine);
1!
3495
        snprintf_safe("\"");
1!
3496
        snprintf_safe("},");
1!
3497

3498
        /* Extend size, because now uname_ptr is not needed. */
3499
        size = e - p;
1✔
3500

3501
        /*
3502
         * Instance block requires uuid encoding so take it
3503
         * from the tail of the buffer.
3504
         */
3505
        snprintf_safe("\"instance\":{");
1!
3506
        char *uuid_buf = &tail[-(UUID_STR_LEN + 1)];
1✔
3507
        if (p >= uuid_buf)
1!
3508
                return -1;
×
3509
        size = uuid_buf - p;
1✔
3510

3511
        tt_uuid_to_string(&INSTANCE_UUID, uuid_buf);
1✔
3512
        snprintf_safe("\"server_id\":\"%s\",", uuid_buf);
1!
3513
        tt_uuid_to_string(&REPLICASET_UUID, uuid_buf);
1✔
3514
        snprintf_safe("\"cluster_id\":\"%s\",", uuid_buf);
1!
3515

3516
        /* No need for uuid_buf anymore. */
3517
        size = e - p;
1✔
3518

3519
        struct timespec ts;
3520
        time_t uptime = 0;
1✔
3521
        if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0)
1!
3522
                uptime = ts.tv_sec - tarantool_start_time;
1✔
3523
        snprintf_safe("\"uptime\":\"%llu\"", (unsigned long long)uptime);
1!
3524
        snprintf_safe("},");
1!
3525

3526
        snprintf_safe("\"build\":{");
1!
3527
        snprintf_safe("\"version\":\"%s\",", PACKAGE_VERSION);
1!
3528
        snprintf_safe("\"cmake_type\":\"%s\"", BUILD_INFO);
1!
3529
        snprintf_safe("},");
1!
3530

3531
        snprintf_safe("\"signal\":{");
1!
3532
        snprintf_safe("\"signo\":%d,", cinfo->signo);
1!
3533
        snprintf_safe("\"si_code\":%d,", cinfo->sicode);
1!
3534
        if (cinfo->signo == SIGSEGV) {
1!
3535
                if (cinfo->sicode == SEGV_MAPERR) {
×
3536
                        snprintf_safe("\"si_code_str\":\"%s\",",
×
3537
                                      "SEGV_MAPERR");
3538
                } else if (cinfo->sicode == SEGV_ACCERR) {
×
3539
                        snprintf_safe("\"si_code_str\":\"%s\",",
×
3540
                                      "SEGV_ACCERR");
3541
                }
3542
        }
3543
        snprintf_safe("\"si_addr\":\"0x%llx\",",
1!
3544
                      (long long)cinfo->siaddr);
3545

3546
#ifdef ENABLE_BACKTRACE
3547
        snprintf_safe("\"backtrace\":\"");
1!
3548
        jnprintf_safe(cinfo->backtrace_buf);
1!
3549
        snprintf_safe("\",");
1!
3550
#endif
3551

3552
        /* 64 bytes should be enough for longest localtime */
3553
        const int ts_size = 64;
1✔
3554
        char *timestamp_rt_str = &tail[-ts_size];
1✔
3555
        if (p >= timestamp_rt_str)
1!
3556
                return -1;
×
3557

3558
        struct tm tm;
3559
        localtime_r(&cinfo->timestamp_rt, &tm);
1✔
3560
        strftime(timestamp_rt_str, ts_size, "%F %T %Z", &tm);
1✔
3561
        timestamp_rt_str[ts_size - 1] = '\0';
1✔
3562

3563
        size = timestamp_rt_str - p;
1✔
3564
        snprintf_safe("\"timestamp\":\"");
1!
3565
        jnprintf_safe(timestamp_rt_str);
1!
3566
        snprintf_safe("\"");
1!
3567
        snprintf_safe("}");
1!
3568
        snprintf_safe("}");
1!
3569

3570
        /* Finalize the "data" key and the whole dump. */
3571
        size = e - p;
1✔
3572
        snprintf_safe("}");
1!
3573
        snprintf_safe("}");
1!
3574

3575
#undef snprintf_safe
3576
#undef jnprintf_safe
3577

3578
        say_debug("crash dump: %s", head);
1!
3579

3580
        /* Timeout 1 sec is taken from the feedback daemon. */
3581
        const char *expr =
1✔
3582
                "require('http.client').post(arg[1],arg[2],{timeout=1});"
3583
                "os.exit(1);";
3584
        const char *exec_argv[7] = {
1✔
3585
                tarantool_path,
3586
                "-e", expr,
3587
                "-",
3588
                box_feedback_host,
3589
                head,
3590
                NULL,
3591
        };
1✔
3592

3593
        extern char **environ;
3594
        int rc = posix_spawn(NULL, exec_argv[0], NULL, NULL,
1!
3595
                             (char **)exec_argv, environ);
3596
        if (rc != 0) {
1!
3597
                fprintf(stderr,
×
3598
                        "posix_spawn with "
3599
                        "exec(%s,[%s,%s,%s,%s,%s,%s,%s]) failed: %s\n",
3600
                        exec_argv[0], exec_argv[0], exec_argv[1], exec_argv[2],
3601
                        exec_argv[3], exec_argv[4], exec_argv[5], exec_argv[6],
3602
                        tt_strerror(rc));
3603
                return -1;
×
3604
        }
3605

3606
        return 0;
1✔
3607
}
3608

3609
/**
3610
 * Box callback to handle crashes.
3611
 */
3612
static void
3613
box_crash_callback(struct crash_info *cinfo)
3✔
3614
{
3615
        if (cinfo->signo == SIGBUS &&
3!
3616
            flightrec_is_mmapped_address(cinfo->siaddr)) {
×
3617
                fprintf(stderr, "error accessing flightrec file\n");
×
3618
                fflush(stderr);
×
3619
                _exit(EXIT_FAILURE);
×
3620
        }
3621

3622
        crash_report_stderr(cinfo);
3✔
3623

3624
        if (box_feedback_crash_enabled &&
4!
3625
            box_feedback_report_crash(cinfo) != 0)
1!
3626
                fprintf(stderr, "unable to send a crash report\n");
×
3627
}
3✔
3628

3629
int
3630
box_set_feedback(void)
4,726✔
3631
{
3632
        const char *host = cfg_gets("feedback_host");
4,726✔
3633

3634
        if (host != NULL && strlen(host) >= BOX_FEEDBACK_HOST_MAX) {
4,726!
3635
                diag_set(ClientError, ER_CFG, "feedback_host",
×
3636
                         "the address is too long");
3637
                return -1;
×
3638
        }
3639

3640
        if (cfg_getb("feedback_enabled") &&
4,726✔
3641
            cfg_getb("feedback_crashinfo") &&
4,726!
3642
            host != NULL) {
3643
                box_feedback_crash_enabled = true;
4,722✔
3644
                strlcpy(box_feedback_host, host, sizeof(box_feedback_host));
4,722✔
3645
                say_debug("enable sending crashinfo feedback");
4,722✔
3646
        } else {
3647
                box_feedback_crash_enabled = false;
4✔
3648
                box_feedback_host[0] = '\0';
4✔
3649
                say_debug("disable sending crashinfo feedback");
4!
3650
        }
3651

3652
        return 0;
4,726✔
3653
}
3654

3655
int
3656
box_set_txn_timeout(void)
4,717✔
3657
{
3658
        double timeout = box_check_txn_timeout();
4,717✔
3659
        if (timeout < 0)
4,717✔
3660
                return -1;
2✔
3661
        txn_timeout_default = timeout;
4,715✔
3662
        return 0;
4,715✔
3663
}
3664

3665
int
3666
box_set_txn_isolation(void)
4,785✔
3667
{
3668
        enum txn_isolation_level level = box_check_txn_isolation();
4,785✔
3669
        if (level == txn_isolation_level_MAX)
4,785✔
3670
                return -1;
6✔
3671
        txn_default_isolation = level;
4,779✔
3672
        return 0;
4,779✔
3673
}
3674

3675
/* }}} configuration bindings */
3676

3677
/**
3678
 * Execute a request against a given space id with
3679
 * a variable-argument tuple described in format.
3680
 *
3681
 * @example: you want to insert 5 into space 1:
3682
 * boxk(IPROTO_INSERT, 1, "[%u]", 5);
3683
 *
3684
 * @example: you want to set field 3 (base 0) of
3685
 * a tuple with key [10, 20] in space 1 to 1000:
3686
 * boxk(IPROTO_UPDATE, 1, "[%u%u][[%s%u%u]]", 10, 20, "=", 3, 1000);
3687
 *
3688
 * @note Since this is for internal use, it has
3689
 * no boundary or misuse checks.
3690
 */
3691
int
3692
boxk(int type, uint32_t space_id, const char *format, ...)
13,515✔
3693
{
3694
        va_list ap;
3695
        struct request request;
3696
        memset(&request, 0, sizeof(request));
13,515✔
3697
        request.type = type;
13,515✔
3698
        request.space_id = space_id;
13,515✔
3699
        va_start(ap, format);
13,515✔
3700
        struct region *region = &fiber()->gc;
13,515!
3701
        size_t size = 0;
13,515✔
3702
        RegionGuard region_guard(region);
27,030✔
3703
        const char *data = mp_vformat_on_region(region, &size, format, ap);
13,515!
3704
        va_end(ap);
13,515✔
3705
        const char *data_end = data + size;
13,515✔
3706
        switch (type) {
13,515!
3707
        case IPROTO_INSERT:
6,877✔
3708
        case IPROTO_REPLACE:
3709
                request.tuple = data;
6,877✔
3710
                request.tuple_end = data_end;
6,877✔
3711
                break;
6,877✔
3712
        case IPROTO_DELETE:
6,614✔
3713
                request.key = data;
6,614✔
3714
                request.key_end = data_end;
6,614✔
3715
                break;
6,614✔
3716
        case IPROTO_UPDATE:
24✔
3717
                request.key = data;
24✔
3718
                mp_next(&data);
24✔
3719
                request.key_end = data;
24✔
3720
                request.tuple = data;
24✔
3721
                mp_next(&data);
24✔
3722
                request.tuple_end = data;
24✔
3723
                request.index_base = 0;
24✔
3724
                break;
24✔
3725
        default:
×
3726
                unreachable();
×
3727
        }
3728
        struct space *space = space_cache_find(space_id);
13,515!
3729
        if (space == NULL)
13,515!
3730
                return -1;
×
3731
        return box_process_rw(&request, space, NULL);
13,515!
3732
}
3733

3734
API_EXPORT int
3735
box_return_tuple(box_function_ctx_t *ctx, box_tuple_t *tuple)
20✔
3736
{
3737
        port_c_add_tuple(ctx->port, tuple);
20✔
3738
        return 0;
20✔
3739
}
3740

3741
API_EXPORT int
3742
box_return_mp(box_function_ctx_t *ctx, const char *mp, const char *mp_end)
98✔
3743
{
3744
        port_c_add_mp(ctx->port, mp, mp_end);
98✔
3745
        return 0;
98✔
3746
}
3747

3748
/* schema_find_id()-like method using only public API */
3749
API_EXPORT uint32_t
3750
box_space_id_by_name(const char *name, uint32_t len)
2,468✔
3751
{
3752
        if (len > BOX_NAME_MAX)
2,468!
3753
                return BOX_ID_NIL;
×
3754
        uint32_t size = mp_sizeof_array(1) + mp_sizeof_str(len);
2,468✔
3755
        RegionGuard region_guard(&fiber()->gc);
4,936!
3756
        char *begin = (char *) region_alloc(&fiber()->gc, size);
2,468!
3757
        if (begin == NULL) {
2,468!
3758
                diag_set(OutOfMemory, size, "region_alloc", "begin");
×
3759
                return BOX_ID_NIL;
×
3760
        }
3761
        char *end = mp_encode_array(begin, 1);
2,468✔
3762
        end = mp_encode_str(end, name, len);
2,468✔
3763

3764
        /* NOTE: error and missing key cases are indistinguishable */
3765
        box_tuple_t *tuple;
3766
        if (box_index_get(BOX_VSPACE_ID, 2, begin, end, &tuple) != 0)
2,468!
3767
                return BOX_ID_NIL;
×
3768
        if (tuple == NULL)
2,468✔
3769
                return BOX_ID_NIL;
14✔
3770
        uint32_t result = BOX_ID_NIL;
2,454✔
3771
        (void) tuple_field_u32(tuple, BOX_SPACE_FIELD_ID, &result);
2,454!
3772
        return result;
2,454✔
3773
}
3774

3775
API_EXPORT uint32_t
3776
box_index_id_by_name(uint32_t space_id, const char *name, uint32_t len)
15✔
3777
{
3778
        if (len > BOX_NAME_MAX)
15!
3779
                return BOX_ID_NIL;
×
3780
        uint32_t size = mp_sizeof_array(2) + mp_sizeof_uint(space_id) +
15✔
3781
                        mp_sizeof_str(len);
15✔
3782
        RegionGuard region_guard(&fiber()->gc);
30!
3783
        char *begin = (char *) region_alloc(&fiber()->gc, size);
15!
3784
        if (begin == NULL) {
15!
3785
                diag_set(OutOfMemory, size, "region_alloc", "begin");
×
3786
                return BOX_ID_NIL;
×
3787
        }
3788
        char *end = mp_encode_array(begin, 2);
15✔
3789
        end = mp_encode_uint(end, space_id);
15✔
3790
        end = mp_encode_str(end, name, len);
15✔
3791

3792
        /* NOTE: error and missing key cases are indistinguishable */
3793
        box_tuple_t *tuple;
3794
        if (box_index_get(BOX_VINDEX_ID, 2, begin, end, &tuple) != 0)
15!
3795
                return BOX_ID_NIL;
×
3796
        if (tuple == NULL)
15!
3797
                return BOX_ID_NIL;
×
3798
        uint32_t result = BOX_ID_NIL;
15✔
3799
        (void) tuple_field_u32(tuple, BOX_INDEX_FIELD_ID, &result);
15!
3800
        return result;
15✔
3801
}
3802

3803
int
3804
box_process1(struct request *request, box_tuple_t **result)
13,230,800✔
3805
{
3806
        if (box_check_slice() != 0)
13,230,800✔
3807
                return -1;
1✔
3808
        struct space *space = space_cache_find(request->space_id);
13,230,800✔
3809
        if (space == NULL)
13,230,800✔
3810
                return -1;
4✔
3811
        /*
3812
         * Allow to write to data-temporary and local spaces in the read-only
3813
         * mode. To handle space truncation and/or ddl operations on temporary
3814
         * spaces, we postpone the read-only check for the _truncate, _space &
3815
         * _index system spaces till the on_replace trigger is called, when
3816
         * we know which spaces are concerned.
3817
         */
3818
        uint32_t id = space_id(space);
13,230,800✔
3819
        if (is_ro_summary &&
903✔
3820
            id != BOX_TRUNCATE_ID &&
893✔
3821
            id != BOX_SPACE_ID &&
873✔
3822
            id != BOX_INDEX_ID &&
855✔
3823
            !space_is_data_temporary(space) &&
855✔
3824
            !space_is_local(space) &&
13,231,700✔
3825
            box_check_writable() != 0)
31!
3826
                return -1;
31✔
3827
        if (space_is_memtx(space)) {
13,230,700✔
3828
                /*
3829
                 * Due to on_init_schema triggers set on system spaces,
3830
                 * we can insert data during recovery to local and
3831
                 * data-temporary spaces. However, until recovery is finished,
3832
                 * we can't check key uniqueness (since indexes are still not
3833
                 * yet built). So reject any attempts to write into these
3834
                 * spaces.
3835
                 */
3836
                if (memtx_space_is_recovering(space)) {
11,942,900✔
3837
                        diag_set(ClientError, ER_UNSUPPORTED, "Snapshot recovery",
4!
3838
                                "write requests, use "
3839
                                "box.ctl.is_recovery_finished() "
3840
                                "to check that snapshot recovery was completed");
3841
                        diag_log();
4✔
3842
                        return -1;
4✔
3843
                }
3844
        }
3845

3846
        return box_process_rw(request, space, result);
13,230,700✔
3847
}
3848

3849
void
3850
box_iterator_position_pack(const char *pos, const char *pos_end,
704✔
3851
                           uint32_t found, const char **packed_pos,
3852
                           const char **packed_pos_end)
3853
{
3854
        assert(packed_pos != NULL);
704!
3855
        assert(packed_pos_end != NULL);
704!
3856
        if (found > 0 && pos != NULL) {
704!
3857
                uint32_t buf_size =
3858
                        iterator_position_pack_bufsize(pos, pos_end);
535✔
3859
                char *buf =
3860
                        (char *)xregion_alloc(&fiber()->gc, buf_size);
535!
3861
                iterator_position_pack(pos, pos_end, buf, buf_size,
535✔
3862
                                       packed_pos, packed_pos_end);
535✔
3863
        } else {
3864
                *packed_pos = NULL;
169✔
3865
                *packed_pos_end = NULL;
169✔
3866
        }
3867
}
704✔
3868

3869
int
3870
box_iterator_position_unpack(const char *packed_pos,
1,948,110✔
3871
                             const char *packed_pos_end,
3872
                             struct key_def *cmp_def, const char *key,
3873
                             uint32_t key_part_count, int iterator,
3874
                             const char **pos, const char **pos_end)
3875
{
3876
        assert(pos != NULL);
1,948,110!
3877
        assert(pos_end != NULL);
1,948,110!
3878
        if (packed_pos != NULL && packed_pos != packed_pos_end) {
1,948,110✔
3879
                uint32_t buf_size =
3880
                        iterator_position_unpack_bufsize(packed_pos,
1,004✔
3881
                                                         packed_pos_end);
1,004✔
3882
                char *buf = (char *)xregion_alloc(&fiber()->gc, buf_size);
1,004!
3883
                if (iterator_position_unpack(packed_pos, packed_pos_end,
1,004✔
3884
                                             buf, buf_size,
3885
                                             pos, pos_end) != 0)
1,004✔
3886
                        return -1;
3✔
3887
                uint32_t pos_part_count = mp_decode_array(pos);
1,001✔
3888
                enum iterator_type type = (enum iterator_type)iterator;
1,001✔
3889
                if (iterator_position_validate(*pos, pos_part_count, key,
1,001✔
3890
                                               key_part_count, cmp_def,
3891
                                               type) != 0)
1,001✔
3892
                        return -1;
1,001✔
3893
        } else {
3894
                *pos = NULL;
1,947,110✔
3895
                *pos_end = NULL;
1,947,110✔
3896
        }
3897
        return 0;
1,948,050✔
3898
}
3899

3900
int
3901
box_select(uint32_t space_id, uint32_t index_id,
1,702,950✔
3902
           int iterator, uint32_t offset, uint32_t limit,
3903
           const char *key, const char *key_end,
3904
           const char **packed_pos, const char **packed_pos_end,
3905
           bool update_pos, struct port *port)
3906
{
3907
        (void)key_end;
3908
        assert(!update_pos || (packed_pos != NULL && packed_pos_end != NULL));
1,702,950!
3909
        assert(packed_pos == NULL || packed_pos_end != NULL);
1,702,950!
3910

3911
        rmean_collect(rmean_box, IPROTO_SELECT, 1);
1,702,950!
3912

3913
        if (iterator < 0 || iterator >= iterator_type_MAX) {
1,702,950!
3914
                diag_set(IllegalParams, "Invalid iterator type");
×
3915
                diag_log();
×
3916
                return -1;
×
3917
        }
3918

3919
        struct space *space = space_cache_find(space_id);
1,702,950!
3920
        if (space == NULL)
1,702,950✔
3921
                return -1;
8,356✔
3922
        if (access_check_space(space, PRIV_R) != 0)
1,694,600!
3923
                return -1;
1,603✔
3924
        struct index *index = index_find(space, index_id);
1,692,990!
3925
        if (index == NULL)
1,692,990✔
3926
                return -1;
3✔
3927

3928
        enum iterator_type type = (enum iterator_type) iterator;
1,692,990✔
3929
        const char *key_array = key;
1,692,990✔
3930
        uint32_t part_count = key ? mp_decode_array(&key) : 0;
3,385,980!
3931
        if (key_validate(index->def, type, key, part_count))
1,692,990!
3932
                return -1;
20✔
3933
        const char *pos, *pos_end;
3934
        if (box_iterator_position_unpack(*packed_pos, *packed_pos_end,
3,385,940✔
3935
                                         index->def->cmp_def, key, part_count,
1,692,970!
3936
                                         type, &pos, &pos_end) != 0)
1,692,970✔
3937
                return -1;
32✔
3938

3939
        box_run_on_select(space, index, type, key_array);
1,692,940!
3940

3941
        ERROR_INJECT(ERRINJ_TESTING, {
1,692,940!
3942
                diag_set(ClientError, ER_INJECTION, "ERRINJ_TESTING");
3943
                return -1;
3944
        });
3945

3946
        struct txn *txn;
3947
        struct txn_ro_savepoint svp;
3948
        if (txn_begin_ro_stmt(space, &txn, &svp) != 0)
1,692,940!
3949
                return -1;
41✔
3950

3951
        struct iterator *it = index_create_iterator_with_offset(
1,692,900!
3952
                index, type, key, part_count, pos, offset);
3953
        if (it == NULL) {
1,692,900✔
3954
                txn_end_ro_stmt(txn, &svp);
313!
3955
                return -1;
313✔
3956
        }
3957

3958
        int rc = 0;
1,692,580✔
3959
        uint32_t found = 0;
1,692,580✔
3960
        struct tuple *tuple;
3961
        port_c_create(port);
1,692,580!
3962
        while (found < limit) {
4,905,650✔
3963
                rc = box_check_slice();
4,887,360!
3964
                if (rc != 0)
4,887,360✔
3965
                        break;
3✔
3966
                rc = iterator_next(it, &tuple);
4,887,360!
3967
                if (rc != 0 || tuple == NULL)
4,887,360✔
3968
                        break;
3969
                port_c_add_tuple(port, tuple);
3,213,070!
3970
                found++;
3,213,070✔
3971
                /*
3972
                 * Refresh the pointer to the space, because the space struct
3973
                 * could be freed if the iterator yielded.
3974
                 */
3975
                space = index_weak_ref_get_space(&it->index_ref);
3,213,070!
3976
        }
3977

3978
        txn_end_ro_stmt(txn, &svp);
1,692,580!
3979
        if (rc != 0)
1,692,580✔
3980
                goto fail;
9✔
3981

3982
        if (update_pos) {
1,692,570✔
3983
                uint32_t pos_size;
3984
                /*
3985
                 * Iterator position is extracted even if no tuples were found
3986
                 * to check if pagination is supported by index.
3987
                 */
3988
                if (iterator_position(it, &pos, &pos_size) != 0)
707!
3989
                        goto fail;
3✔
3990
                box_iterator_position_pack(pos, pos + pos_size, found,
704!
3991
                                           packed_pos, packed_pos_end);
3992
        }
3993
        iterator_delete(it);
1,692,570!
3994
        return 0;
1,692,570✔
3995
fail:
12✔
3996
        iterator_delete(it);
12!
3997
        port_destroy(port);
12!
3998
        return -1;
12✔
3999
}
4000

4001
/**
4002
 * A special wrapper for FFI - workaround for M1.
4003
 * Use 64-bit integers beyond the 8th argument.
4004
 * See https://github.com/LuaJIT/LuaJIT/issues/205 for details.
4005
 */
4006
extern "C" int
4007
box_select_ffi(uint32_t space_id, uint32_t index_id, const char *key,
1,521,050✔
4008
               const char *key_end, const char **packed_pos,
4009
               const char **packed_pos_end, bool update_pos, struct port *port,
4010
               int64_t iterator, uint64_t offset, uint64_t limit)
4011
{
4012
        return box_select(space_id, index_id, iterator, offset, limit, key,
1,521,050✔
4013
                          key_end, packed_pos, packed_pos_end, update_pos,
4014
                          port);
1,521,050✔
4015
}
4016

4017
API_EXPORT int
4018
box_insert(uint32_t space_id, const char *tuple, const char *tuple_end,
2,955,480✔
4019
           box_tuple_t **result)
4020
{
4021
        mp_tuple_assert(tuple, tuple_end);
2,955,480✔
4022
        struct request request;
4023
        memset(&request, 0, sizeof(request));
2,955,480✔
4024
        request.type = IPROTO_INSERT;
2,955,480✔
4025
        request.space_id = space_id;
2,955,480✔
4026
        request.tuple = tuple;
2,955,480✔
4027
        request.tuple_end = tuple_end;
2,955,480✔
4028
        return box_process1(&request, result);
5,910,960!
4029
}
4030

4031
API_EXPORT int
4032
box_replace(uint32_t space_id, const char *tuple, const char *tuple_end,
6,823,440✔
4033
            box_tuple_t **result)
4034
{
4035
        mp_tuple_assert(tuple, tuple_end);
6,823,440✔
4036
        struct request request;
4037
        memset(&request, 0, sizeof(request));
6,823,440✔
4038
        request.type = IPROTO_REPLACE;
6,823,440✔
4039
        request.space_id = space_id;
6,823,440✔
4040
        request.tuple = tuple;
6,823,440✔
4041
        request.tuple_end = tuple_end;
6,823,440✔
4042
        return box_process1(&request, result);
13,646,900!
4043
}
4044

4045
API_EXPORT int
4046
box_delete(uint32_t space_id, uint32_t index_id, const char *key,
1,175,670✔
4047
           const char *key_end, box_tuple_t **result)
4048
{
4049
        mp_tuple_assert(key, key_end);
1,175,670✔
4050
        struct request request;
4051
        memset(&request, 0, sizeof(request));
1,175,670✔
4052
        request.type = IPROTO_DELETE;
1,175,670✔
4053
        request.space_id = space_id;
1,175,670✔
4054
        request.index_id = index_id;
1,175,670✔
4055
        request.key = key;
1,175,670✔
4056
        request.key_end = key_end;
1,175,670✔
4057
        return box_process1(&request, result);
2,351,340!
4058
}
4059

4060
API_EXPORT int
4061
box_update(uint32_t space_id, uint32_t index_id, const char *key,
401,098✔
4062
           const char *key_end, const char *ops, const char *ops_end,
4063
           int index_base, box_tuple_t **result)
4064
{
4065
        mp_tuple_assert(key, key_end);
401,098✔
4066
        mp_tuple_assert(ops, ops_end);
401,098✔
4067
        struct request request;
4068
        memset(&request, 0, sizeof(request));
401,098✔
4069
        request.type = IPROTO_UPDATE;
401,098✔
4070
        request.space_id = space_id;
401,098✔
4071
        request.index_id = index_id;
401,098✔
4072
        request.key = key;
401,098✔
4073
        request.key_end = key_end;
401,098✔
4074
        request.index_base = index_base;
401,098✔
4075
        /** Legacy: in case of update, ops are passed in in request tuple */
4076
        request.tuple = ops;
401,098✔
4077
        request.tuple_end = ops_end;
401,098✔
4078
        return box_process1(&request, result);
802,196!
4079
}
4080

4081
API_EXPORT int
4082
box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
98,919✔
4083
           const char *tuple_end, const char *ops, const char *ops_end,
4084
           int index_base, box_tuple_t **result)
4085
{
4086
        mp_tuple_assert(ops, ops_end);
98,919✔
4087
        mp_tuple_assert(tuple, tuple_end);
98,919✔
4088
        struct request request;
4089
        memset(&request, 0, sizeof(request));
98,919✔
4090
        request.type = IPROTO_UPSERT;
98,919✔
4091
        request.space_id = space_id;
98,919✔
4092
        request.index_id = index_id;
98,919✔
4093
        request.ops = ops;
98,919✔
4094
        request.ops_end = ops_end;
98,919✔
4095
        request.tuple = tuple;
98,919✔
4096
        request.tuple_end = tuple_end;
98,919✔
4097
        request.index_base = index_base;
98,919✔
4098
        return box_process1(&request, result);
197,838!
4099
}
4100

4101
API_EXPORT int
4102
box_insert_arrow(uint32_t space_id, struct ArrowArray *array,
2✔
4103
                 struct ArrowSchema *schema)
4104
{
4105
        struct request request;
4106
        memset(&request, 0, sizeof(request));
2✔
4107
        request.type = IPROTO_INSERT_ARROW;
2✔
4108
        request.space_id = space_id;
2✔
4109
        request.arrow_array = array;
2✔
4110
        request.arrow_schema = schema;
2✔
4111
        return box_process1(&request, NULL);
4!
4112
}
4113

4114
/**
4115
 * Trigger space truncation by bumping a counter
4116
 * in _truncate space.
4117
 */
4118
static void
4119
space_truncate(struct space *space)
2,672✔
4120
{
4121
        size_t buf_size = 3 * mp_sizeof_array(UINT32_MAX) +
2,672✔
4122
                          4 * mp_sizeof_uint(UINT64_MAX) + mp_sizeof_str(1);
2,672✔
4123
        RegionGuard region_guard(&fiber()->gc);
5,344!
4124
        char *buf = (char *)xregion_alloc(&fiber()->gc, buf_size);
2,672!
4125

4126
        char *tuple_buf = buf;
2,672✔
4127
        char *tuple_buf_end = tuple_buf;
2,672✔
4128
        tuple_buf_end = mp_encode_array(tuple_buf_end, 2);
2,672✔
4129
        tuple_buf_end = mp_encode_uint(tuple_buf_end, space_id(space));
2,672✔
4130
        tuple_buf_end = mp_encode_uint(tuple_buf_end, 1);
2,672✔
4131
        assert(tuple_buf_end < buf + buf_size);
2,672!
4132

4133
        char *ops_buf = tuple_buf_end;
2,672✔
4134
        char *ops_buf_end = ops_buf;
2,672✔
4135
        ops_buf_end = mp_encode_array(ops_buf_end, 1);
2,672✔
4136
        ops_buf_end = mp_encode_array(ops_buf_end, 3);
2,672✔
4137
        ops_buf_end = mp_encode_str(ops_buf_end, "+", 1);
2,672✔
4138
        ops_buf_end = mp_encode_uint(ops_buf_end, 1);
2,672✔
4139
        ops_buf_end = mp_encode_uint(ops_buf_end, 1);
2,672✔
4140
        assert(ops_buf_end < buf + buf_size);
2,672!
4141

4142
        if (box_upsert(BOX_TRUNCATE_ID, 0, tuple_buf, tuple_buf_end,
2,672✔
4143
                       ops_buf, ops_buf_end, 0, NULL) != 0)
2,672✔
4144
                diag_raise();
35✔
4145
}
2,637✔
4146

4147
API_EXPORT int
4148
box_truncate(uint32_t space_id)
2,672✔
4149
{
4150
        try {
4151
                struct space *space = space_cache_find_xc(space_id);
2,672!
4152
                space_truncate(space);
2,672✔
4153
                return 0;
2,637✔
4154
        } catch (Exception *exc) {
35✔
4155
                return -1;
35✔
4156
        }
4157
}
4158

4159
/** Update a record in _sequence_data space. */
4160
static int
4161
sequence_data_update(uint32_t seq_id, int64_t value)
129✔
4162
{
4163
        size_t tuple_buf_size = (mp_sizeof_array(2) +
129✔
4164
                                 2 * mp_sizeof_uint(UINT64_MAX));
129✔
4165
        RegionGuard region_guard(&fiber()->gc);
258!
4166
        char *tuple_buf = (char *) region_alloc(&fiber()->gc, tuple_buf_size);
129!
4167
        if (tuple_buf == NULL) {
129!
4168
                diag_set(OutOfMemory, tuple_buf_size, "region", "tuple");
×
4169
                return -1;
×
4170
        }
4171
        char *tuple_buf_end = tuple_buf;
129✔
4172
        tuple_buf_end = mp_encode_array(tuple_buf_end, 2);
129✔
4173
        tuple_buf_end = mp_encode_uint(tuple_buf_end, seq_id);
129✔
4174
        tuple_buf_end = (value < 0 ?
129✔
4175
                         mp_encode_int(tuple_buf_end, value) :
18✔
4176
                         mp_encode_uint(tuple_buf_end, value));
111✔
4177
        assert(tuple_buf_end < tuple_buf + tuple_buf_size);
129!
4178

4179
        struct credentials *orig_credentials = effective_user();
129!
4180
        fiber_set_user(fiber(), &admin_credentials);
129!
4181

4182
        int rc = box_replace(BOX_SEQUENCE_DATA_ID,
129✔
4183
                             tuple_buf, tuple_buf_end, NULL);
4184

4185
        fiber_set_user(fiber(), orig_credentials);
129!
4186
        return rc;
129✔
4187
}
4188

4189
/** Delete a record from _sequence_data space. */
4190
static int
4191
sequence_data_delete(uint32_t seq_id)
13✔
4192
{
4193
        size_t key_buf_size = mp_sizeof_array(1) + mp_sizeof_uint(UINT64_MAX);
13✔
4194
        RegionGuard region_guard(&fiber()->gc);
26!
4195
        char *key_buf = (char *) region_alloc(&fiber()->gc, key_buf_size);
13!
4196
        if (key_buf == NULL) {
13!
4197
                diag_set(OutOfMemory, key_buf_size, "region", "key");
×
4198
                return -1;
×
4199
        }
4200
        char *key_buf_end = key_buf;
13✔
4201
        key_buf_end = mp_encode_array(key_buf_end, 1);
13✔
4202
        key_buf_end = mp_encode_uint(key_buf_end, seq_id);
13✔
4203
        assert(key_buf_end < key_buf + key_buf_size);
13!
4204

4205
        struct credentials *orig_credentials = effective_user();
13!
4206
        fiber_set_user(fiber(), &admin_credentials);
13!
4207

4208
        int rc = box_delete(BOX_SEQUENCE_DATA_ID, 0,
13✔
4209
                            key_buf, key_buf_end, NULL);
4210

4211
        fiber_set_user(fiber(), orig_credentials);
13!
4212
        return rc;
13✔
4213
}
4214

4215
API_EXPORT int
4216
box_sequence_next(uint32_t seq_id, int64_t *result)
125✔
4217
{
4218
        struct sequence *seq = sequence_cache_find(seq_id);
125!
4219
        if (seq == NULL)
125✔
4220
                return -1;
1✔
4221
        if (access_check_sequence(seq) != 0)
124!
4222
                return -1;
3✔
4223
        int64_t value;
4224
        if (sequence_next(seq, &value) != 0)
121!
4225
                return -1;
6✔
4226
        if (sequence_data_update(seq_id, value) != 0)
115!
4227
                return -1;
×
4228
        *result = value;
115✔
4229
        return 0;
115✔
4230
}
4231

4232
API_EXPORT int
4233
box_sequence_current(uint32_t seq_id, int64_t *result)
13✔
4234
{
4235
        struct sequence *seq = sequence_cache_find(seq_id);
13✔
4236
        if (seq == NULL)
13!
4237
                return -1;
×
4238
        if (access_check_sequence(seq) != 0)
13!
4239
                return -1;
×
4240
        if (sequence_get_value(seq, result) != 0)
13✔
4241
                return -1;
6✔
4242
        return 0;
7✔
4243
}
4244

4245
API_EXPORT int
4246
box_sequence_set(uint32_t seq_id, int64_t value)
21✔
4247
{
4248
        struct sequence *seq = sequence_cache_find(seq_id);
21✔
4249
        if (seq == NULL)
21!
4250
                return -1;
×
4251
        if (access_check_sequence(seq) != 0)
21✔
4252
                return -1;
7✔
4253
        if (sequence_set(seq, value) != 0)
14!
4254
                return -1;
×
4255
        return sequence_data_update(seq_id, value);
14✔
4256
}
4257

4258
API_EXPORT int
4259
box_sequence_reset(uint32_t seq_id)
17✔
4260
{
4261
        struct sequence *seq = sequence_cache_find(seq_id);
17✔
4262
        if (seq == NULL)
17✔
4263
                return -1;
1✔
4264
        if (access_check_sequence(seq) != 0)
16✔
4265
                return -1;
3✔
4266
        sequence_reset(seq);
13✔
4267
        return sequence_data_delete(seq_id);
13✔
4268
}
4269

4270
API_EXPORT int
4271
box_session_push(const char *data, const char *data_end)
7✔
4272
{
4273
        struct session *session = current_session();
7!
4274
        if (session == NULL)
7!
4275
                return -1;
×
4276
        if (session_push_check_deprecation() != 0)
7!
4277
                return -1;
2✔
4278
        struct port_msgpack port;
4279
        struct port *base = (struct port *)&port;
5✔
4280
        port_msgpack_create(base, data, data_end - data);
5!
4281
        int rc = session_push(session, base);
5!
4282
        port_msgpack_destroy(base);
5!
4283
        return rc;
5✔
4284
}
4285

4286
API_EXPORT uint64_t
4287
box_session_id(void)
636,680✔
4288
{
4289
        return current_session()->id;
636,680✔
4290
}
4291

4292
API_EXPORT int
4293
box_iproto_send(uint64_t sid,
112✔
4294
                const char *header, const char *header_end,
4295
                const char *body, const char *body_end)
4296
{
4297
        struct session *session = session_find(sid);
112✔
4298
        if (session == NULL) {
112✔
4299
                diag_set(ClientError, ER_NO_SUCH_SESSION, sid);
1!
4300
                return -1;
1✔
4301
        }
4302
        if (session->type != SESSION_TYPE_BINARY) {
111✔
4303
                diag_set(ClientError, ER_WRONG_SESSION_TYPE,
1!
4304
                         session_type_strs[session->type]);
4305
                return -1;
1✔
4306
        }
4307
        return iproto_session_send(session, header, header_end, body, body_end);
110✔
4308
}
4309

4310
API_EXPORT int
4311
box_iproto_override(uint32_t req_type, iproto_handler_t handler,
4✔
4312
                    iproto_handler_destroy_t destroy, void *ctx)
4313
{
4314
        return iproto_override(req_type, handler, destroy, ctx);
4✔
4315
}
4316

4317
API_EXPORT int64_t
4318
box_info_lsn(void)
1,242✔
4319
{
4320
        /*
4321
         * Self can be NULL during bootstrap: entire box.info
4322
         * bundle becomes available soon after entering box.cfg{}
4323
         * and replication bootstrap relies on this as it looks
4324
         * at box.info.status.
4325
         */
4326
        struct replica *self = replica_by_uuid(&INSTANCE_UUID);
1,242✔
4327
        if (self != NULL &&
1,242✔
4328
            (self->id != REPLICA_ID_NIL || cfg_replication_anon)) {
1,097✔
4329
                return vclock_get(box_vclock, self->id);
875✔
4330
        } else {
4331
                return -1;
367✔
4332
        }
4333
}
4334

4335
/**
4336
 * Get memtx status information for box.slab.info
4337
 */
4338
API_EXPORT uint64_t
4339
box_slab_info(enum box_slab_info_type type)
504✔
4340
{
4341
        struct memtx_engine *memtx;
4342
        memtx = (struct memtx_engine *)engine_by_name("memtx");
504!
4343

4344
        struct allocator_stats stats;
4345
        memset(&stats, 0, sizeof(stats));
504✔
4346

4347
        allocators_stats(&stats);
504!
4348
        struct mempool_stats index_stats;
4349
        mempool_stats(&memtx->index_extent_pool, &index_stats);
504!
4350

4351
        switch (type) {
504!
4352
        case BOX_SLAB_INFO_ITEMS_SIZE:
84✔
4353
                return stats.small.total + stats.sys.total;
84✔
4354
        case BOX_SLAB_INFO_ITEMS_USED:
84✔
4355
                return stats.small.used + stats.sys.used;
84✔
4356
        case BOX_SLAB_INFO_ARENA_SIZE:
84✔
4357
                /*
4358
                 * We could use stats.small.used + index_stats.total.used
4359
                 * here, but this would not account for slabs which are
4360
                 * sitting in slab cache or in the arena, available for reuse.
4361
                 * Make sure a simple formula: items_used_ratio > 0.9 &&
4362
                 * arena_used_ratio > 0.9 && quota_used_ratio > 0.9 work as
4363
                 * an indicator for reaching Tarantool memory limit.
4364
                 */
4365
                return memtx->arena.used;
84✔
4366
        case BOX_SLAB_INFO_ARENA_USED:
84✔
4367
                /** System allocator does not use arena. */
4368
                return stats.small.used + index_stats.totals.used;
84✔
4369
        case BOX_SLAB_INFO_QUOTA_SIZE:
84✔
4370
                return quota_total(&memtx->quota);
84✔
4371
        case BOX_SLAB_INFO_QUOTA_USED:
84✔
4372
                return quota_used(&memtx->quota);
84✔
4373
        default:
×
4374
                return 0;
×
4375
        };
4376
}
4377

4378
/**
4379
 * Insert replica record into _cluster space, bypassing all checks like whether
4380
 * the instance is writable. It makes the function usable by bootstrap master
4381
 * when it is read-only but has to register self.
4382
 */
4383
static void
4384
box_insert_replica_record(uint32_t id, const struct tt_uuid *uuid,
3,585✔
4385
                          const char *name)
4386
{
4387
        bool ok;
4388
        if (*name == 0) {
3,585✔
4389
                ok = boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
3,049✔
4390
                          (unsigned)id, tt_uuid_str(uuid)) == 0;
4391
        } else {
4392
                ok = boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s%s]",
536✔
4393
                          (unsigned)id, tt_uuid_str(uuid), name) == 0;
4394
        }
4395
        if (!ok)
3,585✔
4396
                diag_raise();
2✔
4397
        struct replica *new_replica = replica_by_uuid(uuid);
3,583✔
4398
        if (new_replica == NULL || new_replica->id != id)
3,583!
4399
                say_warn("Replica ID is changed by a trigger");
2!
4400
}
3,583✔
4401

4402
static void
4403
box_register_replica(const struct tt_uuid *uuid,
742✔
4404
                     const char *name)
4405
{
4406
        struct replica *replica = replica_by_uuid(uuid);
742!
4407
        /*
4408
         * Find required changes.
4409
         */
4410
        bool need_name = false;
742✔
4411
        bool need_id = false;
742✔
4412
        if (replica == NULL) {
742!
4413
                need_name = *name != 0;
×
4414
                need_id = true;
×
4415
        } else {
4416
                need_name = strcmp(replica->name, name) != 0;
742✔
4417
                need_id = replica->id == REPLICA_ID_NIL;
742✔
4418
        }
4419
        if (!need_name && !need_id)
742✔
4420
                return;
32✔
4421
        /*
4422
         * Apply the changes.
4423
         */
4424
        box_check_writable_xc();
732!
4425
        if (!need_id) {
732✔
4426
                int rc;
4427
                if (*name == 0) {
22!
4428
                        rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
×
4429
                                  "[%u][[%s%dNIL]]", (unsigned)replica->id, "=",
×
4430
                                  BOX_CLUSTER_FIELD_NAME);
4431
                } else {
4432
                        rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
22✔
4433
                                  "[%u][[%s%d%s]]", (unsigned)replica->id, "=",
22!
4434
                                  BOX_CLUSTER_FIELD_NAME, name);
4435
                }
4436
                if (rc != 0)
22✔
4437
                        diag_raise();
2✔
4438
                return;
20✔
4439
        }
4440
        struct replica *other = replica_by_name(name);
710!
4441
        if (other != NULL && other != replica) {
710!
4442
                if (boxk(IPROTO_UPDATE, BOX_CLUSTER_ID, "[%u][[%s%d%s]]",
2!
4443
                         (unsigned)other->id, "=", BOX_CLUSTER_FIELD_UUID,
2!
4444
                         tt_uuid_str(uuid)) != 0)
2!
4445
                        diag_raise();
×
4446
                return;
2✔
4447
        }
4448
        uint32_t replica_id;
4449
        if (replica_find_new_id(&replica_id) != 0)
708!
4450
                diag_raise();
×
4451
        box_insert_replica_record(replica_id, uuid, name);
708✔
4452
}
4453

4454
int
4455
box_process_auth(struct auth_request *request,
2,988✔
4456
                 const char *salt, uint32_t salt_len)
4457
{
4458
        assert(salt_len >= AUTH_SALT_SIZE);
2,988!
4459
        (void)salt_len;
4460

4461
        rmean_collect(rmean_box, IPROTO_AUTH, 1);
2,988!
4462

4463
        /* Check that bootstrap has been finished */
4464
        if (!is_box_configured) {
2,988✔
4465
                diag_set(ClientError, ER_LOADING);
1,605!
4466
                return -1;
1,605✔
4467
        }
4468

4469
        const char *user = request->user_name;
1,383✔
4470
        uint32_t len = mp_decode_strl(&user);
1,383✔
4471
        if (authenticate(user, len, salt, request->scramble) != 0)
1,383!
4472
                return -1;
136✔
4473
        return 0;
1,247✔
4474
}
4475

4476
/**
4477
 * Replica's connection guard:
4478
 * 1. Ensures that the replica has only one connection at a time.
4479
 * 2. Calls the replica's disconnection callback in destructor.
4480
 */
4481
struct ReplicaConnectionGuard {
4482
        ReplicaConnectionGuard(struct replica *replica)
3,268✔
4483
        {
3,268✔
4484
                replica_ = replica;
3,268✔
4485
                if (replica == NULL)
3,268✔
4486
                        return;
4✔
4487
                if (replica->has_incoming_connection) {
3,264✔
4488
                        tnt_raise(ClientError, ER_CFG, "replication",
362!
4489
                                  "duplicate connection with the same replica "
4490
                                  "UUID");
4491
                }
4492
                replica->has_incoming_connection = true;
2,902✔
4493
        }
4494
        ~ReplicaConnectionGuard()
5,807✔
4495
        {
2,901✔
4496
                if (replica_ == NULL)
5,807✔
4497
                        return;
2,906✔
4498
                assert(replica_->has_incoming_connection);
2,901!
4499
                replica_->has_incoming_connection = false;
2,901✔
4500
                replica_on_disconnect(replica_);
2,901✔
4501
        }
5,807✔
4502
        ReplicaConnectionGuard(const ReplicaConnectionGuard &other) = delete;
4503
        ReplicaConnectionGuard(ReplicaConnectionGuard &&other)
2,902✔
4504
        {
2,902✔
4505
                replica_ = other.replica_;
2,902✔
4506
                other.replica_ = NULL;
2,902✔
4507
        }
2,902✔
4508
private:
4509
        /** Connected replica. */
4510
        struct replica *replica_;
4511
};
4512

4513
/**
4514
 * A helper for replication endpoints to handle a connecting replica.
4515
 * 1. Checks if the replica can be connected - replica has read permission
4516
 *    and WAL is enabled.
4517
 * 2. Checks if the replica doesn't have another connection.
4518
 * 3. Creates an anonymous replica object if it doesn't exist.
4519
 * 4. Handles WAL GC state of the replica. It's important that if the replica
4520
 *    already has a WAL GC consumer, it is re-created and never will be deleted
4521
 *    so the replica won't lose its consumer, even in the case of an error.
4522
 * 5. Returns a guard that protects against duplicate replica connection.
4523
 */
4524
NODISCARD static ReplicaConnectionGuard
4525
box_connect_replica(const struct tt_uuid *uuid, const struct vclock *gc_vclock,
3,272✔
4526
                    struct replica **out)
4527
{
4528
        /* Check permissions */
4529
        access_check_universe_xc(PRIV_R);
3,272✔
4530

4531
        /* Forbid replication with disabled WAL */
4532
        if (wal_mode() == WAL_NONE) {
3,269!
4533
                tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
1!
4534
                          "wal_mode = 'none'");
4535
        }
4536

4537
        /* No replica object with nil UUID. */
4538
        if (tt_uuid_is_nil(uuid)) {
3,268✔
4539
                *out = NULL;
4✔
4540
                return ReplicaConnectionGuard(NULL);
4!
4541
        }
4542

4543
        struct replica *replica = replica_by_uuid(uuid);
3,264!
4544
        if (replica == NULL)
3,264✔
4545
                replica = replicaset_add_anon(uuid);
418!
4546

4547
        ReplicaConnectionGuard guard(replica);
6,166✔
4548

4549
        if (replica->gc != NULL)
2,902✔
4550
                gc_consumer_unregister(replica->gc);
1,254!
4551
        replica->gc = gc_consumer_register(gc_vclock,
5,804✔
4552
                                           GC_CONSUMER_REPLICA,
4553
                                           &replica->uuid);
2,902!
4554
        if (gc_consumer_persist(replica->gc) != 0)
2,902!
4555
                diag_raise();
×
4556
        if (replica->gc_checkpoint_ref != NULL) {
2,902!
4557
                gc_unref_checkpoint(replica->gc_checkpoint_ref);
×
4558
                replica->gc_checkpoint_ref = NULL;
×
4559
        }
4560
        *out = replica;
2,902✔
4561
        return guard;
2,902✔
4562
}
4563

4564
void
4565
box_process_fetch_snapshot(struct iostream *io,
45✔
4566
                           const struct xrow_header *header)
4567
{
4568
        assert(header->type == IPROTO_FETCH_SNAPSHOT);
45!
4569

4570
        struct fetch_snapshot_request req;
4571
        xrow_decode_fetch_snapshot_xc(header, &req);
45!
4572

4573
        /* Check that bootstrap has been finished */
4574
        if (!is_box_configured)
45✔
4575
                tnt_raise(ClientError, ER_LOADING);
4!
4576

4577
        /*
4578
         * Find checkpoint for checkpoint join. If replica didn't request
4579
         * specific one, take the newest one. Initialize checkpoint cursor
4580
         * with chosen checkpoint and use its vclock for WAL GC consumer.
4581
         * If requested checkpoint is not found, raise an error.
4582
         */
4583
        struct checkpoint_cursor cursor;
4584
        struct checkpoint_cursor *cursor_ptr = NULL;
41✔
4585
        struct gc_checkpoint *checkpoint = NULL;
41✔
4586
        const struct vclock *gc_vclock = instance_vclock;
41✔
4587
        if (req.is_checkpoint_join) {
41✔
4588
                if (vclock_is_set(&req.checkpoint_vclock)) {
2!
4589
                        checkpoint =
4590
                                gc_checkpoint_at_vclock(&req.checkpoint_vclock);
×
4591
                } else {
4592
                        checkpoint = gc_last_checkpoint();
2✔
4593
                }
4594
                if (checkpoint == NULL)
2!
4595
                        tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
×
4596
                memset(&cursor, 0, sizeof(cursor));
2✔
4597
                cursor.vclock = &checkpoint->vclock;
2✔
4598
                cursor.start_lsn = req.checkpoint_lsn;
2✔
4599
                cursor_ptr = &cursor;
2✔
4600
                gc_vclock = &checkpoint->vclock;
2✔
4601
        }
4602

4603
        struct replica *replica = NULL;
41✔
4604
        auto replica_guard = box_connect_replica(&req.instance_uuid,
4605
                                                 gc_vclock, &replica);
81✔
4606
        /* Reference checkpoint in case of checkpoint join. */
4607
        struct gc_checkpoint_ref *checkpoint_ref = NULL;
40✔
4608
        auto gc_checkpoint_ref_guard = make_scoped_guard([&]() {
40✔
4609
                if (checkpoint_ref != NULL)
40✔
4610
                        gc_unref_checkpoint(checkpoint_ref);
1✔
4611
        });
80!
4612
        if (checkpoint != NULL) {
40✔
4613
                if (replica == NULL) {
2✔
4614
                        checkpoint_ref = gc_ref_checkpoint(
1!
4615
                                checkpoint, "checkpoint join");
4616
                } else {
4617
                        replica->gc_checkpoint_ref = gc_ref_checkpoint(
1!
4618
                                checkpoint, "checkpoint join of replica %s",
4619
                                tt_uuid_str(&replica->uuid));
1!
4620
                }
4621
        }
4622

4623
        /* Send the snapshot data to the instance. */
4624
        say_info("sending read-view to replica at %s", sio_socketname(io->fd));
40!
4625
        struct vclock start_vclock;
4626
        relay_initial_join(io, header->sync, &start_vclock, req.version_id,
40✔
4627
                           cursor_ptr);
4628
        say_info("read-view sent.");
37!
4629

4630
        /* Remember master's vclock after the last request */
4631
        struct vclock stop_vclock;
4632
        vclock_copy(&stop_vclock, instance_vclock);
37✔
4633

4634
        /* Send end of snapshot data marker */
4635
        struct xrow_header row;
4636
        RegionGuard region_guard(&fiber()->gc);
74!
4637
        xrow_encode_vclock_ignore0(&row, &stop_vclock);
37!
4638
        row.sync = header->sync;
37✔
4639
        coio_write_xrow(io, &row);
37!
4640
}
37✔
4641

4642
/**
4643
 * Replica vclock is used in gc state and recovery initialization - need to
4644
 * replace the remote 0-th component with the own one. This doesn't break
4645
 * recovery: it finds the WAL with a vclock strictly less than replia clock in
4646
 * all components except the 0th one.
4647
 *
4648
 * Note, that it would be bad to set 0-th component to a smaller value (like
4649
 * zero) - it would unnecessarily require additional WALs, which may have
4650
 * already been deleted.
4651
 *
4652
 * Speaking of gc, remote instances' local vclock components are not used by
4653
 * consumers at all.
4654
 */
4655
static void
4656
box_localize_vclock(const struct vclock *remote, struct vclock *local)
2,518✔
4657
{
4658
        vclock_copy(local, remote);
2,518✔
4659
        vclock_reset(local, 0, vclock_get(instance_vclock, 0));
2,518✔
4660
}
2,518✔
4661

4662
void
4663
box_process_register(struct iostream *io, const struct xrow_header *header)
36✔
4664
{
4665
        assert(header->type == IPROTO_REGISTER);
36!
4666

4667
        struct register_request req;
4668
        xrow_decode_register_xc(header, &req);
36!
4669

4670
        if (!is_box_configured)
36!
4671
                tnt_raise(ClientError, ER_LOADING);
×
4672

4673
        if (tt_uuid_is_equal(&req.instance_uuid, &INSTANCE_UUID))
36!
4674
                tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
×
4675

4676
        /*
4677
         * We only get register requests from instances which need some actual
4678
         * registration - name, id.
4679
         */
4680
        struct replica *replica = replica_by_uuid(&req.instance_uuid);
36!
4681
        bool need_id = false;
36✔
4682
        bool need_name = false;
36✔
4683
        if (replica == NULL) {
36✔
4684
                need_id = true;
1✔
4685
                need_name = *req.instance_name != 0;
1✔
4686
        } else {
4687
                need_id = replica->id == REPLICA_ID_NIL;
35✔
4688
                need_name = strcmp(req.instance_name, replica->name) != 0;
35✔
4689
        }
4690
        if (!need_id && !need_name) {
36!
4691
                tnt_raise(ClientError, ER_REPLICA_NOT_ANON,
×
4692
                          tt_uuid_str(&req.instance_uuid));
4693
        }
4694
        if (box_is_anon()) {
36!
4695
                tnt_raise(ClientError, ER_UNSUPPORTED, "Anonymous replica",
×
4696
                          "registration of non-anonymous nodes.");
4697
        }
4698

4699
        /* See box_process_join() */
4700
        box_check_writable_xc();
36!
4701
        struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
36!
4702
        access_check_space_xc(space, PRIV_W);
36!
4703

4704
        struct vclock start_vclock;
4705
        box_localize_vclock(&req.vclock, &start_vclock);
36✔
4706
        auto replica_guard = box_connect_replica(&req.instance_uuid,
4707
                                                 &start_vclock, &replica);
56✔
4708

4709
        say_info("registering replica %s at %s",
20!
4710
                 tt_uuid_str(&req.instance_uuid), sio_socketname(io->fd));
4711
        ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY);
195!
4712
        box_register_replica(&req.instance_uuid, req.instance_name);
20✔
4713

4714
        replica = replica_by_uuid(&req.instance_uuid);
19!
4715
        if (replica == NULL)
19!
4716
                tnt_raise(ClientError, ER_CANNOT_REGISTER);
×
4717

4718
        /* Remember master's vclock after the last request */
4719
        struct vclock stop_vclock;
4720
        vclock_copy(&stop_vclock, instance_vclock);
19✔
4721
        /*
4722
         * Feed replica with WALs up to the REGISTER itself so that it gets own
4723
         * registration entry.
4724
         */
4725
        relay_final_join(replica, io, header->sync, &start_vclock,
19✔
4726
                         &stop_vclock);
4727
        say_info("final data sent.");
18!
4728

4729
        RegionGuard region_guard(&fiber()->gc);
36!
4730
        struct xrow_header row;
4731
        /* Send end of WAL stream marker */
4732
        xrow_encode_vclock_ignore0(&row, instance_vclock);
18!
4733
        row.sync = header->sync;
18✔
4734
        coio_write_xrow(io, &row);
18!
4735

4736
        /*
4737
         * Advance the WAL consumer state to the position where
4738
         * registration was completed.
4739
         */
4740
        gc_consumer_advance(replica->gc, &stop_vclock);
18!
4741
}
18✔
4742

4743
void
4744
box_process_join(struct iostream *io, const struct xrow_header *header)
1,060✔
4745
{
4746
        /*
4747
         * Tarantool 1.7 JOIN protocol diagram (gh-1113)
4748
         * =============================================
4749
         *
4750
         * Replica => Master
4751
         *
4752
         * => JOIN { INSTANCE_UUID: replica_uuid }
4753
         * <= OK { VCLOCK: start_vclock }
4754
         *    Replica has enough permissions and master is ready for JOIN.
4755
         *     - start_vclock - master's vclock at the time of join.
4756
         *
4757
         * <= INSERT
4758
         *    ...
4759
         *    Initial data: a stream of engine-specifc rows, e.g. snapshot
4760
         *    rows for memtx or dirty cursor data for Vinyl fed from a
4761
         *    read-view. Engine can use REPLICA_ID, LSN and other fields
4762
         *    for internal purposes.
4763
         *    ...
4764
         * <= INSERT
4765
         * <= OK { VCLOCK: stop_vclock } - end of initial JOIN stage.
4766
         *     - `stop_vclock` - master's vclock when it's done
4767
         *     done sending rows from the snapshot (i.e. vclock
4768
         *     for the end of final join).
4769
         *
4770
         * <= INSERT/REPLACE/UPDATE/UPSERT/DELETE { REPLICA_ID, LSN }
4771
         *    ...
4772
         *    Final data: a stream of WAL rows from `start_vclock` to
4773
         *    `stop_vclock`, inclusive. REPLICA_ID and LSN fields are
4774
         *    original values from WAL and master-master replication.
4775
         *    ...
4776
         * <= INSERT/REPLACE/UPDATE/UPSERT/DELETE { REPLICA_ID, LSN }
4777
         * <= OK { VCLOCK: current_vclock } - end of final JOIN stage.
4778
         *      - `current_vclock` - master's vclock after final stage.
4779
         *
4780
         * All packets must have the same SYNC value as initial JOIN request.
4781
         * Master can send ERROR at any time. Replica doesn't confirm rows
4782
         * by OKs. Either initial or final stream includes:
4783
         *  - Cluster UUID in _schema space
4784
         *  - Registration of master in _cluster space
4785
         *  - Registration of the new replica in _cluster space
4786
         */
4787

4788
        assert(header->type == IPROTO_JOIN);
1,060!
4789

4790
        struct join_request req;
4791
        xrow_decode_join_xc(header, &req);
1,060✔
4792

4793
        /* Check that bootstrap has been finished */
4794
        if (!is_box_configured)
1,059✔
4795
                tnt_raise(ClientError, ER_LOADING);
332!
4796

4797
        /* Forbid connection to itself */
4798
        if (tt_uuid_is_equal(&req.instance_uuid, &INSTANCE_UUID))
727!
4799
                tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
×
4800

4801
        if (box_is_anon()) {
727!
4802
                tnt_raise(ClientError, ER_UNSUPPORTED, "Anonymous replica",
4!
4803
                          "registration of non-anonymous nodes.");
4804
        }
4805

4806
        /*
4807
         * Unless already registered, the new replica will be added to _cluster
4808
         * space once the initial join stage is complete. Fail early if the
4809
         * caller does not have appropriate access privileges or has some other
4810
         * obvious errors. So as not to waste time on the long join process
4811
         * then.
4812
         */
4813
        struct replica *replica = replica_by_uuid(&req.instance_uuid);
723!
4814
        if (replica == NULL || replica->id == REPLICA_ID_NIL ||
723✔
4815
            strcmp(replica->name, req.instance_name) != 0) {
10!
4816
                box_check_writable_xc();
713✔
4817
                struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
708!
4818
                access_check_space_xc(space, PRIV_W);
708✔
4819
        }
4820
        if ((replica == NULL && *req.instance_name != 0) ||
715✔
4821
            (replica != NULL &&
704✔
4822
             strcmp(replica->name, req.instance_name) != 0)) {
354✔
4823
                struct replica *other = replica_by_name(req.instance_name);
180!
4824
                if (other != NULL && other != replica &&
184!
4825
                    replica_has_connections(other)) {
4!
4826
                        tnt_raise(ClientError, ER_INSTANCE_NAME_DUPLICATE,
2!
4827
                                  node_name_str(req.instance_name),
4828
                                  tt_uuid_str(&other->uuid));
4829
                }
4830
        }
4831

4832
        auto replica_guard = box_connect_replica(&req.instance_uuid,
4833
                                                 instance_vclock, &replica);
1,425✔
4834

4835
        say_info("joining replica %s at %s",
712!
4836
                 tt_uuid_str(&req.instance_uuid), sio_socketname(io->fd));
4837

4838
        /*
4839
         * Initial stream: feed replica with dirty data from engines.
4840
         */
4841
        struct vclock start_vclock;
4842
        relay_initial_join(io, header->sync, &start_vclock, req.version_id,
712✔
4843
                           NULL);
4844
        say_info("initial data sent.");
711!
4845
        /**
4846
         * Register the replica after sending the last row but before sending
4847
         * OK - if the registration fails, the error reaches the client.
4848
         */
4849
        box_register_replica(&req.instance_uuid, req.instance_name);
711✔
4850

4851
        ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY);
745!
4852

4853
        replica = replica_by_uuid(&req.instance_uuid);
710!
4854
        if (replica == NULL)
710!
4855
                tnt_raise(ClientError, ER_CANNOT_REGISTER);
×
4856

4857
        /* Remember master's vclock after the last request */
4858
        struct vclock stop_vclock;
4859
        vclock_copy(&stop_vclock, instance_vclock);
710✔
4860
        /* Send end of initial stage data marker */
4861
        struct xrow_header row;
4862
        RegionGuard region_guard(&fiber()->gc);
1,420!
4863
        xrow_encode_vclock_ignore0(&row, &stop_vclock);
710!
4864
        row.sync = header->sync;
710✔
4865
        coio_write_xrow(io, &row);
710✔
4866

4867
        /*
4868
         * Final stage: feed replica with WALs in range
4869
         * (start_vclock, stop_vclock).
4870
         */
4871
        relay_final_join(replica, io, header->sync, &start_vclock,
709✔
4872
                         &stop_vclock);
4873
        say_info("final data sent.");
707!
4874

4875
        /* Send end of WAL stream marker */
4876
        xrow_encode_vclock_ignore0(&row, instance_vclock);
707!
4877
        row.sync = header->sync;
707✔
4878
        coio_write_xrow(io, &row);
707✔
4879

4880
        /*
4881
         * Advance the WAL consumer state to the position where
4882
         * FINAL JOIN ended.
4883
         */
4884
        gc_consumer_advance(replica->gc, &stop_vclock);
706!
4885
}
706✔
4886

4887
void
4888
box_process_subscribe(struct iostream *io, const struct xrow_header *header)
3,596✔
4889
{
4890
        assert(header->type == IPROTO_SUBSCRIBE);
3,596!
4891

4892
        /* Check that bootstrap has been finished */
4893
        if (!is_box_configured)
3,596✔
4894
                tnt_raise(ClientError, ER_LOADING);
1,080!
4895

4896
        struct subscribe_request req;
4897
        xrow_decode_subscribe_xc(header, &req);
2,516!
4898

4899
        /* Forbid connection to itself */
4900
        if (tt_uuid_is_equal(&req.instance_uuid, &INSTANCE_UUID))
2,516!
4901
                tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
×
4902
        /*
4903
         * The peer should have bootstrapped from somebody since it tries to
4904
         * subscribe already. If it belongs to a different replicaset, it won't
4905
         * be ever found here, and would try to reconnect thinking its replica
4906
         * ID wasn't replicated here yet. Prevent it right away.
4907
         */
4908
        if (!tt_uuid_is_equal(&req.replicaset_uuid, &REPLICASET_UUID)) {
2,516✔
4909
                tnt_raise(ClientError, ER_REPLICASET_UUID_MISMATCH,
7!
4910
                          tt_uuid_str(&REPLICASET_UUID),
4911
                          tt_uuid_str(&req.replicaset_uuid));
4912
        }
4913
        /*
4914
         * Replicaset name mismatch is not considered a critical error. It can
4915
         * happen if rename happened and then some replicas reconnected. They
4916
         * won't ever be able to fetch the new name if the master rejects them.
4917
         */
4918
        if (strcmp(req.replicaset_name, REPLICASET_NAME) != 0) {
2,509✔
4919
                say_warn("Replicaset name mismatch on subscribe. Peer name - %s, "
13!
4920
                         "local name - %s", node_name_str(req.replicaset_name),
4921
                         node_name_str(REPLICASET_NAME));
4922
        }
4923
        /*
4924
         * Do not allow non-anonymous followers for anonymous
4925
         * instances.
4926
         */
4927
        if (box_is_anon() && !req.is_anon) {
2,509!
4928
                tnt_raise(ClientError, ER_UNSUPPORTED, "Anonymous replica",
14!
4929
                          "non-anonymous followers.");
4930
        }
4931

4932
        /* Check replica uuid */
4933
        struct replica *replica = replica_by_uuid(&req.instance_uuid);
2,495!
4934

4935
        if (req.is_anon && replica != NULL) {
2,495✔
4936
                if (replica->id != REPLICA_ID_NIL)
34!
4937
                        tnt_raise(ClientError, ER_PROTOCOL, "Can't subscribe "
×
4938
                                  "an anonymous replica having an ID assigned");
4939
                if (!replica->anon)
34✔
4940
                        tnt_raise(ClientError, ER_PROTOCOL, "Can't subscribe "
1!
4941
                                  "a previously deleted non-anonymous replica "
4942
                                  "as an anonymous replica");
33✔
4943
        } else if (!req.is_anon &&
2,461✔
4944
                   (replica == NULL || replica->id == REPLICA_ID_NIL)) {
2,436✔
4945
                /*
4946
                 * The instance is not anonymous, and is registered (at least it
4947
                 * claims so), but its ID is not delivered to the current
4948
                 * instance yet. Need to wait until its _cluster record arrives
4949
                 * from some third node. Likely to happen on bootstrap, when
4950
                 * there is a fullmesh and 1 leader doing all the _cluster
4951
                 * registrations. Not all of them are delivered to the other
4952
                 * nodes yet.
4953
                 * Also can happen when the replica is deleted from _cluster,
4954
                 * but still tries to subscribe. It won't have an ID here.
4955
                 */
4956
                tnt_raise(ClientError, ER_TOO_EARLY_SUBSCRIBE,
12!
4957
                          tt_uuid_str(&req.instance_uuid));
4958
        }
4959
        /*
4960
         * Replica name mismatch is not considered a critical error. It can
4961
         * happen if rename happened and then the replica reconnected. It won't
4962
         * ever be able to fetch the new name if the master rejects the
4963
         * connection.
4964
         */
4965
        if (replica != NULL &&
2,482✔
4966
            strcmp(replica->name, req.instance_name) != 0) {
2,457✔
4967
                say_warn("Instance name mismatch on subscribe. "
3!
4968
                         "Peer claims name - %s, its stored name - %s",
4969
                         node_name_str(req.instance_name),
4970
                         node_name_str(replica->name));
4971
        }
4972
        struct vclock start_vclock;
4973
        box_localize_vclock(&req.vclock, &start_vclock);
2,482✔
4974
        auto replica_guard = box_connect_replica(&req.instance_uuid,
4975
                                                 &start_vclock, &replica);
4,615✔
4976
        /*
4977
         * Send a response to SUBSCRIBE request, tell
4978
         * the replica how many rows we have in stock for it,
4979
         * and identify ourselves with our own replica id.
4980
         *
4981
         * Master not only checks the replica has the same replicaset UUID, but
4982
         * also sends the UUID to the replica so both Tarantools could perform
4983
         * any checks they want depending on their version and supported
4984
         * features.
4985
         *
4986
         * Older versions not supporting replicaset UUID in the response will
4987
         * just ignore the additional field (these are < 2.1.1).
4988
         */
4989
        struct subscribe_response rsp;
4990
        memset(&rsp, 0, sizeof(rsp));
2,134✔
4991
        vclock_copy(&rsp.vclock, instance_vclock);
2,134✔
4992
        rsp.replicaset_uuid = REPLICASET_UUID;
2,134✔
4993
        strlcpy(rsp.replicaset_name, REPLICASET_NAME, NODE_NAME_SIZE_MAX);
2,134!
4994
        struct xrow_header row;
4995
        RegionGuard region_guard(&fiber()->gc);
4,267!
4996
        xrow_encode_subscribe_response(&row, &rsp);
2,134!
4997
        /*
4998
         * Identify the message with the replica id of this
4999
         * instance, this is the only way for a replica to find
5000
         * out the id of the instance it has connected to.
5001
         */
5002
        struct replica *self = replica_by_uuid(&INSTANCE_UUID);
2,134!
5003
        assert(self != NULL); /* the local registration is read-only */
2,134!
5004
        row.replica_id = self->id;
2,134✔
5005
        row.sync = header->sync;
2,134✔
5006
        coio_write_xrow(io, &row);
2,134✔
5007

5008
        say_info("subscribed replica %s at %s",
2,124!
5009
                 tt_uuid_str(&req.instance_uuid), sio_socketname(io->fd));
5010
        say_info("remote vclock %s local vclock %s",
2,124!
5011
                 vclock_to_string(&req.vclock), vclock_to_string(&rsp.vclock));
5012
        uint64_t sent_raft_term = 0;
2,124✔
5013
        if (req.version_id >= version_id(2, 6, 0) && !req.is_anon) {
2,124✔
5014
                /*
5015
                 * Send out the current raft state of the instance. Don't do
5016
                 * that if the remote instance is old. It can be that a part of
5017
                 * the cluster still contains old versions, which can't handle
5018
                 * Raft messages. Raft's network footprint should be 0 as seen
5019
                 * by such instances.
5020
                 */
5021
                struct raft_request raft_req;
5022
                box_raft_checkpoint_remote(&raft_req);
2,062!
5023
                xrow_encode_raft(&row, &fiber()->gc, &raft_req);
2,062!
5024
                relay_filter_raft(&row, req.version_id);
2,062!
5025
                coio_write_xrow(io, &row);
2,062!
5026
                sent_raft_term = raft_req.term;
2,062✔
5027
        }
5028
        /*
5029
         * Process SUBSCRIBE request via replication relay
5030
         * Send current recovery vector clock as a marker
5031
         * of the "current" state of the master. When
5032
         * replica fetches rows up to this position,
5033
         * it enters read-write mode.
5034
         *
5035
         * @todo: this is not implemented, this is imperfect, and
5036
         * this is buggy in case there is rollback followed by
5037
         * a stall in updates (in this case replica may hang
5038
         * indefinitely).
5039
         */
5040
        relay_subscribe(replica, io, header->sync, &start_vclock,
2,124!
5041
                        req.version_id, req.id_filter, sent_raft_term);
5042
}
×
5043

5044
void
5045
box_process_vote(struct ballot *ballot)
49,955✔
5046
{
5047
        ballot->is_ro_cfg = cfg_geti("read_only") != 0;
49,955✔
5048
        const char *mode_name = cfg_gets("election_mode");
49,955✔
5049
        enum election_mode mode = election_mode_by_name(mode_name);
49,955✔
5050
        assert(mode != ELECTION_MODE_INVALID);
49,955!
5051
        ballot->can_lead = mode == ELECTION_MODE_CANDIDATE ||
49,955✔
5052
                           mode == ELECTION_MODE_MANUAL;
5053
        ballot->is_anon = cfg_replication_anon;
49,955✔
5054
        assert(!(ballot->is_anon && ballot->can_lead));
49,955!
5055
        ballot->is_ro = is_ro_summary;
49,955✔
5056
        ballot->is_booted = is_box_configured;
49,955✔
5057
        vclock_copy(&ballot->vclock, instance_vclock);
49,955✔
5058
        vclock_copy(&ballot->gc_vclock, &gc.vclock);
49,955✔
5059
        ballot->bootstrap_leader_uuid = bootstrap_leader_uuid;
49,955✔
5060
        if (*INSTANCE_NAME != '\0') {
49,955✔
5061
                strlcpy(ballot->instance_name, INSTANCE_NAME,
3,107✔
5062
                        NODE_NAME_SIZE_MAX);
5063
        } else if (*cfg_instance_name != '\0') {
46,848✔
5064
                strlcpy(ballot->instance_name, cfg_instance_name,
2,542✔
5065
                        NODE_NAME_SIZE_MAX);
5066
        } else {
5067
                *ballot->instance_name = '\0';
44,306✔
5068
        }
5069
        int i = 0;
49,955✔
5070
        replicaset_foreach(replica) {
203,251✔
5071
                if (replica->id != 0)
153,296✔
5072
                        ballot->registered_replica_uuids[i++] = replica->uuid;
146,846✔
5073
        }
5074
        assert(i < VCLOCK_MAX);
49,955!
5075

5076
        ballot->registered_replica_uuids_size = i;
49,955✔
5077
}
49,955✔
5078

5079
/** Fill _schema space with initial data on bootstrap. */
5080
static void
5081
box_populate_schema_space(void)
2,877✔
5082
{
5083
        struct tt_uuid replicaset_uuid;
5084
        if (box_check_replicaset_uuid(&replicaset_uuid) != 0)
2,877!
5085
                diag_raise();
×
5086
        char replicaset_name[NODE_NAME_SIZE_MAX];
5087
        if (box_check_replicaset_name(replicaset_name) != 0)
2,877!
5088
                diag_raise();
×
5089
        char cluster_name[NODE_NAME_SIZE_MAX];
5090
        if (box_check_cluster_name(cluster_name) != 0)
2,877!
5091
                diag_raise();
×
5092

5093
        if (tt_uuid_is_nil(&replicaset_uuid))
2,877✔
5094
                tt_uuid_create(&replicaset_uuid);
2,859!
5095
        if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "replicaset_uuid",
2,877!
5096
                 tt_uuid_str(&replicaset_uuid)))
2,877!
5097
                diag_raise();
×
5098
        box_set_cluster_name_record(cluster_name);
2,877!
5099
        box_set_replicaset_name_record(replicaset_name);
2,877!
5100
        if (bootstrap_strategy == BOOTSTRAP_STRATEGY_SUPERVISED)
2,877✔
5101
                box_set_bootstrap_leader_record();
2!
5102
}
2,877✔
5103

5104
static void
5105
box_on_indexes_built(void)
4,349✔
5106
{
5107
        box_run_on_recovery_state(RECOVERY_STATE_INDEXES_BUILT);
4,349✔
5108
}
4,349✔
5109

5110
/**
5111
 * Runs all triggers from event 'tarantool.trigger.on_change' with one
5112
 * argument: name of the changed event.
5113
 * Each returned value is ignored, all thrown errors are logged.
5114
 */
5115
static int
5116
box_trigger_on_change(struct trigger *trigger, void *data)
52,526✔
5117
{
5118
        (void)trigger;
5119
        assert(tarantool_trigger_on_change_event != NULL);
52,526!
5120
        struct event *on_change_event = tarantool_trigger_on_change_event;
52,526✔
5121
        struct event *event = (struct event *)data;
52,526✔
5122

5123
        if (!event_has_triggers(on_change_event))
52,526✔
5124
                return 0;
52,515✔
5125

5126
        struct port args;
5127
        port_c_create(&args);
11!
5128
        port_c_add_str0(&args, event->name);
11!
5129
        event_run_triggers_no_fail(on_change_event, &args);
11!
5130
        port_destroy(&args);
11!
5131
        return 0;
11✔
5132
}
5133

5134
static TRIGGER(box_trigger_on_change_trigger, box_trigger_on_change);
5135

5136
static void
5137
engine_init()
4,476✔
5138
{
5139
        /*
5140
         * Sic: order is important here, since
5141
         * memtx must be the first to participate
5142
         * in checkpoints (in enigne_foreach order),
5143
         * so it must be registered first.
5144
         */
5145
        struct memtx_engine *memtx;
5146
        memtx = memtx_engine_new_xc(cfg_gets("memtx_dir"),
8,952✔
5147
                                    box_is_force_recovery,
5148
                                    cfg_getd("memtx_memory"),
4,476✔
5149
                                    cfg_geti("memtx_min_tuple_size"),
4,476✔
5150
                                    cfg_geti("strip_core"),
4,476✔
5151
                                    cfg_geti("slab_alloc_granularity"),
4,476✔
5152
                                    cfg_gets("memtx_allocator"),
5153
                                    cfg_getd("slab_alloc_factor"),
4,476✔
5154
                                    cfg_geti("memtx_sort_threads"),
5155
                                    box_on_indexes_built);
5156
        engine_register((struct engine *)memtx);
4,475✔
5157
        assert(memtx->base.id < MAX_TX_ENGINE_COUNT);
4,475!
5158
        box_set_memtx_max_tuple_size();
4,475✔
5159

5160
        memcs_engine_register();
4,475✔
5161

5162
        struct engine *vinyl;
5163
        vinyl = vinyl_engine_new_xc(cfg_gets("vinyl_dir"),
8,950✔
5164
                                    cfg_geti64("vinyl_memory"),
4,475✔
5165
                                    cfg_geti("vinyl_read_threads"),
5166
                                    cfg_geti("vinyl_write_threads"),
5167
                                    box_is_force_recovery);
5168
        engine_register(vinyl);
4,475✔
5169
        assert(vinyl->id < MAX_TX_ENGINE_COUNT);
4,475!
5170
        box_set_vinyl_max_tuple_size();
4,475✔
5171
        box_set_vinyl_cache();
4,475✔
5172
        box_set_vinyl_timeout();
4,475✔
5173

5174
        struct sysview_engine *sysview = sysview_engine_new_xc();
4,475✔
5175
        engine_register((struct engine *)sysview);
4,475✔
5176

5177
        struct engine *service_engine = service_engine_new_xc();
4,475✔
5178
        engine_register(service_engine);
4,475✔
5179

5180
        struct engine *blackhole = blackhole_engine_new_xc();
4,475✔
5181
        engine_register(blackhole);
4,475✔
5182
}
4,475✔
5183

5184
/**
5185
 * Blindly apply whatever comes from bootstrap. This is either a
5186
 * local snapshot, or received from a remote master. In both cases
5187
 * it is not WAL - records are not sorted by their commit order,
5188
 * and don't have headers.
5189
 */
5190
static int
5191
bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
2,275,120✔
5192
{
5193
        (void)base;
5194

5195
        entry->res = 0;
2,275,120✔
5196
        journal_async_complete(entry);
2,275,120✔
5197
        return 0;
2,275,120✔
5198
}
5199

5200
/**
5201
 * Wait until every remote peer that managed to connect chooses this node as its
5202
 * bootstrap leader and fail otherwise.
5203
 */
5204
static void
5205
check_bootstrap_unanimity(void)
2,847✔
5206
{
5207
        replicaset_foreach(replica) {
3,346✔
5208
                struct applier *applier = replica->applier;
500✔
5209
                if (applier == NULL || applier->state != APPLIER_CONNECTED)
500!
5210
                        continue;
×
5211
                struct ballot *ballot = &applier->ballot;
500✔
5212
                const char *replica_str =
5213
                        tt_sprintf("%s at %s", tt_uuid_str(&applier->uuid),
500✔
5214
                                   applier_uri_str(applier));
5215
                say_info("Checking if %s chose this instance as bootstrap "
500!
5216
                         "leader", replica_str);
5217
                if (tt_uuid_is_nil(&ballot->bootstrap_leader_uuid))
500✔
5218
                        applier_wait_bootstrap_leader_uuid_is_set(applier);
266✔
5219
                if (tt_uuid_compare(&ballot->bootstrap_leader_uuid,
499✔
5220
                                    &INSTANCE_UUID) != 0) {
499!
5221
                        tnt_raise(ClientError, ER_BOOTSTRAP_NOT_UNANIMOUS,
×
5222
                                  tt_uuid_str(&replica->uuid),
5223
                                  tt_uuid_str(&ballot->bootstrap_leader_uuid));
5224
                }
5225
        }
5226
}
2,846✔
5227

5228
/** Ensure the configured and stored global identifiers (UUID) match. */
5229
static int
5230
check_global_ids_integrity(void)
1,531✔
5231
{
5232
        char cluster_name[NODE_NAME_SIZE_MAX];
5233
        char replicaset_name[NODE_NAME_SIZE_MAX];
5234
        char instance_name[NODE_NAME_SIZE_MAX];
5235
        struct tt_uuid replicaset_uuid;
5236
        if (box_check_cluster_name(cluster_name) != 0 ||
1,531!
5237
            box_check_replicaset_name(replicaset_name) != 0 ||
1,531!
5238
            box_check_replicaset_uuid(&replicaset_uuid) != 0 ||
4,593!
5239
            box_check_instance_name(instance_name) != 0)
1,531!
5240
                return -1;
×
5241

5242
        if (*cluster_name != 0 && strcmp(cluster_name, CLUSTER_NAME) != 0) {
1,531✔
5243
                diag_set(ClientError, ER_CLUSTER_NAME_MISMATCH,
12!
5244
                         node_name_str(cluster_name),
5245
                         node_name_str(CLUSTER_NAME));
5246
                return -1;
12✔
5247
        }
5248
        if (*replicaset_name != 0 &&
1,519✔
5249
            strcmp(replicaset_name, REPLICASET_NAME) != 0) {
198✔
5250
                diag_set(ClientError, ER_REPLICASET_NAME_MISMATCH,
12!
5251
                         node_name_str(replicaset_name),
5252
                         node_name_str(REPLICASET_NAME));
5253
                return -1;
12✔
5254
        }
5255
        if (!tt_uuid_is_nil(&replicaset_uuid) &&
1,527✔
5256
            !tt_uuid_is_equal(&replicaset_uuid, &REPLICASET_UUID)) {
20✔
5257
                diag_set(ClientError, ER_REPLICASET_UUID_MISMATCH,
3!
5258
                         tt_uuid_str(&replicaset_uuid),
5259
                         tt_uuid_str(&REPLICASET_UUID));
5260
                return -1;
3✔
5261
        }
5262
        if (*instance_name != 0 && strcmp(instance_name, INSTANCE_NAME) != 0) {
1,504✔
5263
                diag_set(ClientError, ER_INSTANCE_NAME_MISMATCH,
12!
5264
                         node_name_str(instance_name),
5265
                         node_name_str(INSTANCE_NAME));
5266
                return -1;
12✔
5267
        }
5268
        return 0;
1,492✔
5269
}
5270

5271
/**
5272
 * Initialize the first replica of a new replica set.
5273
 */
5274
static void
5275
bootstrap_master(void)
2,882✔
5276
{
5277
        /* Do not allow to bootstrap a readonly instance as master. */
5278
        if (cfg_geti("read_only") == 1) {
2,882✔
5279
                tnt_raise(ClientError, ER_BOOTSTRAP_READONLY);
1!
5280
        }
5281
        /*
5282
         * With "auto" bootstrap strategy refuse to boot unless everyone agrees
5283
         * this node is the bootstrap leader.
5284
         */
5285
        if (bootstrap_strategy == BOOTSTRAP_STRATEGY_AUTO)
2,881✔
5286
                check_bootstrap_unanimity();
2,847✔
5287
        engine_bootstrap_xc();
2,880✔
5288
        if (box_set_replication_synchro_queue_max_size() != 0)
2,877!
5289
                diag_raise();
×
5290

5291
        uint32_t replica_id = 1;
2,877✔
5292
        box_insert_replica_record(replica_id, &INSTANCE_UUID,
2,877✔
5293
                                  cfg_instance_name);
5294
        assert(replica_by_uuid(&INSTANCE_UUID)->id == 1);
2,877!
5295
        assert(strcmp(cfg_instance_name, INSTANCE_NAME) == 0);
2,877!
5296
        box_populate_schema_space();
2,877✔
5297

5298
        /* Enable WAL subsystem. */
5299
        if (wal_enable() != 0)
2,877!
5300
                diag_raise();
×
5301

5302
        /* Make the initial checkpoint */
5303
        if (gc_checkpoint() != 0) {
2,877✔
5304
                say_error("failed to create a checkpoint");
1!
5305
                diag_raise();
1✔
5306
        }
5307

5308
        box_run_on_recovery_state(RECOVERY_STATE_SNAPSHOT_RECOVERED);
2,876✔
5309
        box_run_on_recovery_state(RECOVERY_STATE_WAL_RECOVERED);
2,876✔
5310
        assert(!recovery_state_synced_is_reached);
2,876!
5311
        box_run_on_recovery_state(RECOVERY_STATE_SYNCED);
2,876✔
5312
        recovery_state_synced_is_reached = true;
2,876✔
5313
}
2,876✔
5314

5315
/**
5316
 * Bootstrap from the remote master
5317
 * \pre  master->applier->state == APPLIER_CONNECTED
5318
 * \post master->applier->state == APPLIER_READY
5319
 *
5320
 * \throws an exception in case an unrecoverable error
5321
 * \return false in case of a transient error
5322
 *         true in case everything is fine
5323
 */
5324
static bool
5325
bootstrap_from_master(struct replica *master)
742✔
5326
{
5327
        struct applier *applier = master->applier;
742✔
5328
        assert(applier != NULL);
742!
5329
        try {
5330
                applier_resume_to_state(applier, APPLIER_READY,
742✔
5331
                                        TIMEOUT_INFINITY);
5332
        } catch (FiberIsCancelled *e) {
2✔
5333
                throw e;
1✔
5334
        } catch (...) {
×
5335
                return false;
×
5336
        }
5337
        assert(applier->state == APPLIER_READY);
741!
5338
        /*
5339
         * In case of rejoin the vclock could be already set to send it in the
5340
         * ballot and for other global things. Make it unset again so the
5341
         * applier could "init" it again.
5342
         */
5343
        vclock_clear(&instance_vclock_storage);
741✔
5344
        say_info("bootstrapping replica from %s at %s",
741!
5345
                 tt_uuid_str(&master->uuid),
5346
                 sio_strfaddr(&applier->addr, applier->addr_len));
5347

5348
        /*
5349
         * Send JOIN request to master
5350
         * See box_process_join().
5351
         */
5352
        assert(!tt_uuid_is_nil(&INSTANCE_UUID));
741!
5353
        try {
5354
                applier_resume_to_state(applier, APPLIER_FETCH_SNAPSHOT,
741✔
5355
                                        TIMEOUT_INFINITY);
5356
        } catch (FiberIsCancelled *e) {
×
5357
                throw e;
×
5358
        } catch (...) {
10✔
5359
                return false;
10✔
5360
        }
5361

5362
        /*
5363
         * Process initial data (snapshot or dirty disk data).
5364
         */
5365
        engine_begin_initial_recovery_xc(NULL);
731✔
5366
        enum applier_state wait_state = cfg_replication_anon ?
731✔
5367
                                        APPLIER_FETCHED_SNAPSHOT :
5368
                                        APPLIER_FINAL_JOIN;
5369
        applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY);
731✔
5370

5371
        box_run_on_recovery_state(RECOVERY_STATE_SNAPSHOT_RECOVERED);
730✔
5372

5373
        /*
5374
         * Process final data (WALs).
5375
         */
5376
        engine_begin_final_recovery_xc();
730✔
5377
        recovery_journal_create(&instance_vclock_storage);
730✔
5378

5379
        if (!cfg_replication_anon) {
730✔
5380
                applier_resume_to_state(applier, APPLIER_JOINED,
701✔
5381
                                        TIMEOUT_INFINITY);
5382
        }
5383
        /* Finalize the new replica */
5384
        engine_end_recovery_xc();
727✔
5385
        if (box_set_replication_synchro_queue_max_size() != 0)
727!
5386
                diag_raise();
×
5387

5388
        /* Switch applier to initial state */
5389
        applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
727✔
5390
        assert(applier->state == APPLIER_READY);
727!
5391

5392
        /*
5393
         * An engine may write to WAL on its own during the join
5394
         * stage (e.g. Vinyl's deferred DELETEs). That's OK - those
5395
         * records will pass through the recovery journal and wind
5396
         * up in the initial checkpoint. However, we must enable
5397
         * the WAL right before starting checkpointing so that
5398
         * records written during and after the initial checkpoint
5399
         * go to the real WAL and can be recovered after restart.
5400
         * This also clears the recovery journal created on stack.
5401
         */
5402
        if (wal_enable() != 0)
727!
5403
                diag_raise();
×
5404

5405
        /* Make the initial checkpoint */
5406
        if (gc_checkpoint() != 0) {
727✔
5407
                say_error("failed to create a checkpoint");
2!
5408
                diag_raise();
2✔
5409
        }
5410

5411
        box_run_on_recovery_state(RECOVERY_STATE_WAL_RECOVERED);
725✔
5412

5413
        return true;
725✔
5414
}
5415

5416
/**
5417
 * Bootstrap a new instance either as the first master in a
5418
 * replica set or as a replica of an existing master.
5419
 *
5420
 * @param[out] is_bootstrap_leader  set if this instance is
5421
 *                                  the leader of a new cluster
5422
 */
5423
static void
5424
bootstrap(bool *is_bootstrap_leader)
3,626✔
5425
{
5426
        struct tt_uuid instance_uuid;
5427
        if (box_check_instance_uuid(&instance_uuid) != 0)
3,626!
5428
                diag_raise();
×
5429

5430
        assert(tt_uuid_is_nil(&INSTANCE_UUID));
3,626!
5431
        if (!tt_uuid_is_nil(&instance_uuid))
3,626✔
5432
                INSTANCE_UUID = instance_uuid;
92✔
5433
        else
5434
                tt_uuid_create(&INSTANCE_UUID);
3,534!
5435

5436
        replicaset_state = REPLICASET_BOOTSTRAP;
3,626✔
5437
        box_broadcast_id();
3,626!
5438
        box_broadcast_ballot();
3,626!
5439
        say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
3,626!
5440

5441
        /*
5442
         * Begin listening on the socket to enable
5443
         * master-master replication leader election.
5444
         */
5445
        if (box_listen() != 0)
3,626!
5446
                diag_raise();
1✔
5447
        /*
5448
         * Wait for the cluster to start up.
5449
         *
5450
         * Note, when bootstrapping a new instance, we try to
5451
         * connect to all masters during timeout to make sure
5452
         * all replicas recieve the same replica set UUID when
5453
         * a new cluster is deployed. If we fail to do so, settle
5454
         * with connecting to 'replication_connect_quorum' masters.
5455
         * If this also fails, throw an error.
5456
         */
5457
        struct replica *master;
5458
        /*
5459
         * Rebootstrap
5460
         *
5461
         * 1) Try to connect to all nodes in the replicaset during
5462
         *    the waiting period in order to update their ballot.
5463
         *
5464
         * 2) After updated the ballot of all nodes, it are looking
5465
         *    for a new master.
5466
         *
5467
         * 3) Wait until bootstrap_from_master succeeds.
5468
         */
5469
        double timeout = replication_timeout;
3,625✔
5470
        bool say_once = false;
3,625✔
5471

5472
        while (true) {
5473
                box_restart_replication();
3,635✔
5474
                master = replicaset_find_join_master();
3,619✔
5475
                bootstrap_leader_uuid = master == NULL ? INSTANCE_UUID :
3,616✔
5476
                                                         master->uuid;
5477
                box_broadcast_ballot();
3,616!
5478

5479
                if (master == NULL ||
4,576✔
5480
                    tt_uuid_is_equal(&master->uuid, &INSTANCE_UUID)) {
960✔
5481
                        bootstrap_master();
2,882✔
5482
                        *is_bootstrap_leader = true;
2,876✔
5483
                        break;
2,876✔
5484
                }
5485
                if (bootstrap_from_master(master)) {
734✔
5486
                        if (check_global_ids_integrity() != 0)
719!
5487
                                diag_raise();
14✔
5488
                        *is_bootstrap_leader = false;
705✔
5489
                        break;
705✔
5490
                }
5491
                if (!say_once) {
10✔
5492
                        say_info("rebootstrap failed, will retry "
8!
5493
                                 "every %.2lf second", timeout);
5494
                }
5495

5496
                say_once = true;
10✔
5497
                fiber_sleep(timeout);
10✔
5498
        }
5499
}
3,581✔
5500

5501
/**
5502
 * Recover the instance from the local directory.
5503
 * Enter hot standby if the directory is locked.
5504
 * Invoke rebootstrap if the instance fell too much
5505
 * behind its peers in the replica set and needs
5506
 * to be rebootstrapped.
5507
 */
5508
static void
5509
local_recovery(const struct vclock *checkpoint_vclock)
847✔
5510
{
5511
        assert(!tt_uuid_is_nil(&INSTANCE_UUID));
847!
5512
        struct tt_uuid instance_uuid;
5513
        if (box_check_instance_uuid(&instance_uuid) != 0)
847!
5514
                diag_raise();
×
5515

5516
        replicaset_state = REPLICASET_RECOVERY;
847✔
5517
        if (!tt_uuid_is_nil(&instance_uuid) &&
871✔
5518
            !tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) {
24✔
5519
                tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
1!
5520
                          tt_uuid_str(&instance_uuid),
5521
                          tt_uuid_str(&INSTANCE_UUID));
5522
        }
5523

5524
        say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
846!
5525

5526
        struct wal_stream wal_stream;
5527
        wal_stream_create(&wal_stream);
846✔
5528
        auto stream_guard = make_scoped_guard([&]{
30✔
5529
                wal_stream_abort(&wal_stream);
30✔
5530
                wal_stream_destroy(&wal_stream);
30✔
5531
        });
896!
5532
        struct recovery *recovery = recovery_new(
846!
5533
                wal_dir(), box_is_force_recovery, checkpoint_vclock);
846✔
5534
        /*
5535
         * Make sure we report the actual recovery position
5536
         * in box.info while local recovery is in progress.
5537
         */
5538
        box_vclock = &recovery->vclock;
846✔
5539
        auto guard = make_scoped_guard([&]{
843✔
5540
                box_vclock = instance_vclock;
843✔
5541
                recovery_delete(recovery);
843✔
5542
        });
896!
5543

5544
        /*
5545
         * Initialize the replica set vclock from recovery.
5546
         * The local WAL may contain rows from remote masters,
5547
         * so we must reflect this in replicaset vclock to
5548
         * not attempt to apply these rows twice.
5549
         */
5550
        recovery_scan(recovery, &instance_vclock_storage, &gc.vclock,
846!
5551
                      &wal_stream.base);
5552
        box_broadcast_ballot();
845!
5553
        say_info("instance vclock %s", vclock_to_string(instance_vclock));
845!
5554

5555
        if (wal_dir_lock >= 0) {
845✔
5556
                if (box_listen() != 0)
841!
5557
                        diag_raise();
×
5558
                box_update_replication();
841!
5559

5560
                struct replica *master;
5561
                double timeout = replication_timeout;
841✔
5562
                bool say_once = false;
841✔
5563

5564
                while (replicaset_needs_rejoin(&master)) {
841!
5565
                        bootstrap_leader_uuid = master->uuid;
8✔
5566
                        box_broadcast_ballot();
8!
5567
                        if (!say_once) {
8!
5568
                                say_crit("replica is too old, initiating "
8!
5569
                                         "rebootstrap");
5570
                        }
5571

5572
                        if (bootstrap_from_master(master))
8!
5573
                                return;
6✔
5574
                        box_restart_replication();
×
5575

5576
                        if (!say_once) {
×
5577
                                say_info("rebootstrap failed, will retry "
×
5578
                                         "every %.2lf second", timeout);
5579
                        }
5580

5581
                        say_once = true;
×
5582
                        fiber_sleep(timeout);
×
5583
                }
5584
        }
5585

5586
        /*
5587
         * recovery->vclock is needed by Vinyl to filter
5588
         * WAL rows that were dumped before restart.
5589
         *
5590
         * XXX: Passing an internal member of the recovery
5591
         * object to an engine is an ugly hack. Instead we
5592
         * should introduce space_vtab::apply_wal_row method
5593
         * and explicitly pass the statement LSN to it.
5594
         */
5595
        engine_begin_initial_recovery_xc(&recovery->vclock);
837!
5596

5597
        struct memtx_engine *memtx;
5598
        memtx = (struct memtx_engine *)engine_by_name("memtx");
837!
5599
        assert(memtx != NULL);
837!
5600

5601
        /*
5602
         * We explicitly request memtx to recover its
5603
         * snapshot as a separate phase since it contains
5604
         * data for system spaces, and triggers on
5605
         * recovery of system spaces issue DDL events in
5606
         * other engines.
5607
         */
5608
        memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
837✔
5609
        box_run_on_recovery_state(RECOVERY_STATE_SNAPSHOT_RECOVERED);
820!
5610
        /*
5611
         * Xlog starts after snapshot. Hence recovery vclock must point at the
5612
         * end of snapshot (= checkpoint vclock).
5613
         */
5614
        struct vclock recovery_vclock;
5615
        vclock_copy(&recovery_vclock, checkpoint_vclock);
820✔
5616
        recovery_journal_create(&recovery_vclock);
820✔
5617
        engine_begin_final_recovery_xc();
820!
5618
        recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
820✔
5619
        if (wal_stream_has_unfinished_tx(&wal_stream)) {
817!
5620
                diag_set(XlogError, "found a not finished transaction "
1!
5621
                         "in the log");
5622
                wal_stream_abort(&wal_stream);
1!
5623
                if (!box_is_force_recovery)
1!
5624
                        diag_raise();
×
5625
                diag_log();
1!
5626
        }
5627
        /*
5628
         * Leave hot standby mode, if any, only after
5629
         * acquiring the lock.
5630
         */
5631
        if (wal_dir_lock < 0) {
817✔
5632
                title("hot_standby");
4!
5633
                say_info("Entering hot standby mode");
4!
5634
                engine_begin_hot_standby_xc();
4✔
5635
                recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
3!
5636
                                      cfg_getd("wal_dir_rescan_delay"));
5637
                while (!fiber_is_cancelled()) {
32✔
5638
                        if (path_lock(wal_dir(), &wal_dir_lock))
30!
5639
                                diag_raise();
×
5640
                        if (wal_dir_lock >= 0)
30✔
5641
                                break;
1✔
5642
                        fiber_sleep(0.1);
29✔
5643
                }
5644
                recovery_stop_local(recovery);
3!
5645
                fiber_testcancel();
3✔
5646
                recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
1!
5647
                if (wal_stream_has_unfinished_tx(&wal_stream)) {
1!
5648
                        diag_set(XlogError, "found a not finished transaction "
×
5649
                                 "in the log in hot standby mode");
5650
                        wal_stream_abort(&wal_stream);
×
5651
                        if (!box_is_force_recovery)
×
5652
                                diag_raise();
×
5653
                        diag_log();
×
5654
                }
5655
                /*
5656
                 * Advance replica set vclock to reflect records
5657
                 * applied in hot standby mode.
5658
                 */
5659
                vclock_copy(&instance_vclock_storage, &recovery->vclock);
1✔
5660
                if (box_listen() != 0)
1!
5661
                        diag_raise();
×
5662
                box_update_replication();
1!
5663
        } else if (vclock_compare(instance_vclock, &recovery->vclock) != 0) {
813✔
5664
                /*
5665
                 * There are several reasons for a node to recover a vclock not
5666
                 * matching the one scanned initially:
5667
                 *
5668
                 * 1) someone else might append the files we are reading.
5669
                 *    This shouldn't typically happen with Tarantool >= 1.7.x,
5670
                 *    because it takes a lock on WAL directory.
5671
                 *    But it's still possible in some crazy set up, for example
5672
                 *    one could recover from another instance's symlinked xlogs.
5673
                 *
5674
                 * 2) Non-matching (by signature) snaps and xlogs, i.e.:
5675
                 *    a) snaps from a remote instance together with local
5676
                 *       istance's xlogs.
5677
                 *    b) snaps/xlogs from Tarantool 1.6.
5678
                 *
5679
                 * The second case could be distinguished from the first one,
5680
                 * but this would require unnecessarily re-reading an xlog
5681
                 * preceding the latest snap, just to make sure the WAL doesn't
5682
                 * span over the snap creation signature.
5683
                 *
5684
                 * Allow both cases, if the user has set force_recovery.
5685
                 */
5686
                const char *mismatch_str =
5687
                        tt_sprintf("Replicaset vclock %s doesn't match "
2!
5688
                                   "recovered data %s",
5689
                                   vclock_to_string(instance_vclock),
5690
                                   vclock_to_string(&recovery->vclock));
2!
5691
                if (box_is_force_recovery) {
2✔
5692
                        say_warn("%s: ignoring, because 'force_recovery' "
1!
5693
                                 "configuration option is set.", mismatch_str);
5694
                        vclock_copy(&instance_vclock_storage,
1✔
5695
                                    &recovery->vclock);
1✔
5696
                } else {
5697
                        panic("Can't proceed. %s.", mismatch_str);
1!
5698
                }
5699
        }
5700
        stream_guard.is_active = false;
813✔
5701
        recovery_finalize(recovery);
813!
5702
        wal_stream_destroy(&wal_stream);
813!
5703

5704
        /*
5705
         * We must enable WAL before finalizing engine recovery,
5706
         * because an engine may start writing to WAL right after
5707
         * this point (e.g. deferred DELETE statements in Vinyl).
5708
         * This also clears the recovery journal created on stack.
5709
         */
5710
        if (wal_enable() != 0)
813!
5711
                diag_raise();
×
5712

5713
        engine_end_recovery_xc();
813✔
5714
        if (check_global_ids_integrity() != 0)
812!
5715
                diag_raise();
25✔
5716
        box_run_on_recovery_state(RECOVERY_STATE_WAL_RECOVERED);
787!
5717
}
5718

5719
static void
5720
tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
4,534,480✔
5721
{
5722
        (void) loop;
5723
        (void) events;
5724
        struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
4,534,480✔
5725
#ifndef NDEBUG
5726
        /*
5727
         * The sleep is legal because it is not a fiber sleep. It puts the
5728
         * entire thread to sleep to simulate it being slow. It can happen in
5729
         * reality if the thread somewhy isn't scheduled for too long.
5730
         */
5731
        struct errinj *inj = errinj(ERRINJ_TX_DELAY_PRIO_ENDPOINT,
4,534,480!
5732
                                    ERRINJ_DOUBLE);
5733
        if (inj->dparam != 0)
4,534,480✔
5734
                usleep(inj->dparam * 1000000);
4✔
5735
#endif
5736
        cbus_process(endpoint);
4,534,480✔
5737
}
4,534,480✔
5738

5739
static void
5740
on_wal_garbage_collection(const struct vclock *vclock)
4✔
5741
{
5742
        gc_advance(vclock);
4✔
5743
}
4✔
5744

5745
static void
5746
on_wal_checkpoint_threshold(void)
4✔
5747
{
5748
        say_info("WAL threshold exceeded, triggering checkpoint");
4!
5749
        gc_trigger_checkpoint();
4✔
5750
}
4✔
5751

5752
bool
5753
box_is_configured(void)
1,935,600✔
5754
{
5755
        return is_box_configured;
1,935,600✔
5756
}
5757

5758
int
5759
box_check_configured(void)
2,523✔
5760
{
5761
        if (!is_box_configured) {
2,523✔
5762
                diag_set(ClientError, ER_UNCONFIGURED);
12!
5763
                return -1;
12✔
5764
        }
5765
        return 0;
2,511✔
5766
}
5767

5768
static void
5769
box_cfg_xc(void)
4,476✔
5770
{
5771
        box_set_force_recovery();
4,476!
5772
        box_storage_init();
4,476✔
5773
        title("loading");
4,474!
5774

5775
        if (box_set_prepared_stmt_cache_size() != 0)
4,474!
5776
                diag_raise();
×
5777
        box_set_net_msg_max();
4,474!
5778
        box_set_readahead();
4,474!
5779
        box_set_too_long_threshold();
4,474!
5780
        box_set_replication_timeout();
4,474!
5781
        if (box_set_bootstrap_strategy() != 0)
4,474!
5782
                diag_raise();
×
5783
        if (box_set_bootstrap_leader() != 0)
4,474!
5784
                diag_raise();
×
5785
        box_set_replication_connect_timeout();
4,474!
5786
        if (bootstrap_strategy == BOOTSTRAP_STRATEGY_LEGACY)
4,474✔
5787
                box_set_replication_connect_quorum();
412!
5788
        box_set_replication_sync_lag();
4,474!
5789
        if (box_set_replication_synchro_quorum() != 0)
4,474!
5790
                diag_raise();
×
5791
        if (box_set_replication_synchro_timeout() != 0)
4,474!
5792
                diag_raise();
×
5793
        if (box_set_replication_synchro_queue_max_size() != 0)
4,474!
5794
                diag_raise();
×
5795
        box_set_replication_sync_timeout();
4,474!
5796
        if (box_check_instance_name(cfg_instance_name) != 0)
4,474!
5797
                diag_raise();
×
5798
        if (box_set_wal_queue_max_size() != 0)
4,474!
5799
                diag_raise();
×
5800
        cfg_replication_anon = box_check_replication_anon();
4,474!
5801
        box_broadcast_ballot();
4,474!
5802
        /*
5803
         * Must be set before opening the server port, because it may be
5804
         * requested by a client before the configuration is completed.
5805
         */
5806
        box_set_auth_type();
4,474!
5807

5808
        struct gc_checkpoint *checkpoint = gc_last_checkpoint();
4,474✔
5809

5810
        /*
5811
         * Lock the write ahead log directory to avoid multiple
5812
         * instances running in the same dir.
5813
         */
5814
        if (path_lock(wal_dir(), &wal_dir_lock) < 0)
4,474!
5815
                diag_raise();
1✔
5816
        if (wal_dir_lock < 0) {
4,473✔
5817
                /**
5818
                 * The directory is busy and hot standby mode is off:
5819
                 * refuse to start. In hot standby mode, a busy
5820
                 * WAL dir must contain at least one xlog.
5821
                 */
5822
                if (!cfg_geti("hot_standby") || checkpoint == NULL)
4!
5823
                        tnt_raise(ClientError, ER_ALREADY_RUNNING, wal_dir());
×
5824
        }
5825

5826
        struct journal bootstrap_journal;
5827
        journal_create(&bootstrap_journal, bootstrap_journal_write);
4,473✔
5828
        journal_set(&bootstrap_journal);
4,473✔
5829
        auto bootstrap_journal_guard = make_scoped_guard([] {
96✔
5830
                journal_set(NULL);
96✔
5831
        });
8,942!
5832

5833
        bool is_bootstrap_leader = false;
4,473✔
5834
        if (checkpoint != NULL) {
4,473✔
5835
                /* Recover the instance from the local directory */
5836
                local_recovery(&checkpoint->vclock);
847✔
5837
        } else {
5838
                /* Bootstrap a new instance */
5839
                bootstrap(&is_bootstrap_leader);
3,626✔
5840
        }
5841
        /*
5842
         * During bootstrap from a remote master try not to ignore the
5843
         * conflicts, neither during snapshot fetch, not join.
5844
         */
5845
        box_set_replication_skip_conflict();
4,374!
5846
        replicaset_state = REPLICASET_READY;
4,374✔
5847

5848
        /*
5849
         * replicaset.applier.vclock is filled with real
5850
         * value where local restore has already completed
5851
         */
5852
        vclock_copy(&replicaset.applier.vclock, instance_vclock);
4,374✔
5853

5854
        /*
5855
         * Load persistent WAL GC consumers.
5856
         */
5857
        if (gc_load_consumers() != 0)
4,374!
5858
                diag_raise();
×
5859

5860
        /*
5861
         * Initialize GC of anonymous replicas after loading WAL GC consumers.
5862
         */
5863
        replication_anon_gc_init();
4,374!
5864

5865
        /*
5866
         * Exclude self from GC delay because we care
5867
         * about remote replicas only, still for ref/unref
5868
         * balance we do reference self node initially and
5869
         * downgrade it to zero when there is no replication
5870
         * set at all.
5871
         */
5872
        gc_delay_unref();
4,374!
5873

5874
        bootstrap_journal_guard.is_active = false;
4,374✔
5875
        assert(current_journal != &bootstrap_journal);
4,374!
5876

5877
        /*
5878
         * Check for correct registration of the instance in _cluster
5879
         * The instance won't exist in _cluster space if it is an
5880
         * anonymous replica, add it manually.
5881
         */
5882
        if (cfg_replication_anon != box_is_anon())
4,374!
5883
                panic("'replication_anon' cfg didn't work");
1!
5884
        struct replica *self = replica_by_uuid(&INSTANCE_UUID);
4,373!
5885
        if (!cfg_replication_anon) {
4,373✔
5886
                if (self == NULL || self->id == REPLICA_ID_NIL) {
4,340!
5887
                        tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
×
5888
                                  tt_uuid_str(&INSTANCE_UUID),
5889
                                  tt_uuid_str(&REPLICASET_UUID));
5890
                }
5891
        } else if (self == NULL) {
33✔
5892
                replicaset_add_anon(&INSTANCE_UUID);
28!
5893
        }
5894

5895
        rmean_cleanup(rmean_box);
4,373!
5896

5897
        /* Follow replica */
5898
        replicaset_follow();
4,373!
5899

5900
        is_box_configured = true;
4,373✔
5901
        box_broadcast_ballot();
4,373!
5902
        /*
5903
         * Fill in leader election parameters after bootstrap. Before it is not
5904
         * possible - there may be relevant data to recover from WAL and
5905
         * snapshot. Also until recovery is done, it is not possible to write
5906
         * new records into WAL. Another reason - before recovery is done,
5907
         * instance_id is not known, so Raft simply can't work.
5908
         */
5909
        struct raft *raft = box_raft();
4,373✔
5910
        if (!cfg_replication_anon)
4,373✔
5911
                raft_cfg_instance_id(raft, instance_id);
4,340!
5912
        raft_cfg_vclock(raft, instance_vclock);
4,373!
5913

5914
        if (box_set_election_timeout() != 0)
4,373!
5915
                diag_raise();
×
5916
        if (box_set_election_fencing_mode() != 0)
4,373!
5917
                diag_raise();
×
5918
        /*
5919
         * Election is enabled last. So as all the parameters are installed by
5920
         * that time.
5921
         */
5922
        if (box_set_election_mode() != 0)
4,373!
5923
                diag_raise();
×
5924

5925
        /*
5926
         * Enable split brain detection once node is fully recovered or
5927
         * bootstrapped. No split brain could happen during bootstrap or local
5928
         * recovery. Only do so in an upgraded cluster. Unfortunately, schema
5929
         * version 2.10.1 was used in 2.10.0 release, while split-brain
5930
         * detection appeared in 2.10.1. So use the schema version after 2.10.1.
5931
         */
5932
        if (dd_version_id > version_id(2, 10, 1))
4,373✔
5933
                txn_limbo_filter_enable(&txn_limbo);
4,354!
5934

5935
        title("running");
4,373!
5936
        say_info("ready to accept requests");
4,373!
5937

5938
        if (!is_bootstrap_leader) {
4,373✔
5939
                replicaset_sync();
1,497✔
5940
        } else if (box_election_mode == ELECTION_MODE_CANDIDATE ||
2,876✔
5941
                   box_election_mode == ELECTION_MODE_MANUAL) {
2,828✔
5942
                /*
5943
                 * When the cluster is just bootstrapped and this instance is a
5944
                 * leader, it makes no sense to wait for a leader appearance.
5945
                 * There is no one. Moreover this node *is* a leader, so it
5946
                 * should take the control over the situation and start a new
5947
                 * term immediately.
5948
                 */
5949
                int rc = box_raft_try_promote();
60!
5950
                if (raft->leader != instance_id && raft->leader != 0) {
60!
5951
                        /*
5952
                         * It was promoted and is a single registered node -
5953
                         * there can't be another leader or a new term bump.
5954
                         */
5955
                        panic("Bootstrap master couldn't elect self as a "
×
5956
                              "leader. Leader is %u, term is %llu",
5957
                              raft->leader, (long long)raft->volatile_term);
5958
                }
5959
                assert(rc == 0);
60!
5960
                (void)rc;
5961
        }
5962

5963
        /* box.cfg.read_only is not read yet. */
5964
        assert(box_is_ro());
4,371!
5965
        /* If anyone is waiting for ro mode. */
5966
        fiber_cond_broadcast(&ro_cond);
4,371!
5967
        /*
5968
         * Yield to let ro condition waiters to handle the event.
5969
         * Without yield it may happen there won't be a context
5970
         * switch until the ro state is changed again, and as a
5971
         * result, some ro waiters may sleep forever. For example,
5972
         * when Tarantool is just started, it is expected it will
5973
         * enter ro=true state, and then ro=false. Without the
5974
         * yield the ro=true event may be lost. This affects
5975
         * box.ctl.wait_ro() call.
5976
         */
5977
        fiber_sleep(0);
4,371✔
5978
}
4,371✔
5979

5980
void
5981
box_cfg(void)
4,476✔
5982
{
5983
        try {
5984
                box_cfg_xc();
4,476✔
5985
        } catch (Exception *e) {
100✔
5986
                e->log();
100!
5987
                panic("can't initialize storage: %s", e->get_errmsg());
100!
5988
        }
5989
}
4,371✔
5990

5991
int
5992
box_checkpoint(void)
1,314✔
5993
{
5994
        assert(is_box_configured);
1,314!
5995
        return gc_checkpoint();
1,314✔
5996
}
5997

5998
void
5999
box_checkpoint_async(void)
2✔
6000
{
6001
        if (!is_box_configured || is_storage_shutdown)
2!
6002
                return;
×
6003
        gc_trigger_checkpoint();
2✔
6004
}
6005

6006
int
6007
box_backup_start(int checkpoint_idx, box_backup_cb cb, void *cb_arg)
13✔
6008
{
6009
        assert(checkpoint_idx >= 0);
13!
6010
        if (backup_gc != NULL) {
13✔
6011
                diag_set(ClientError, ER_BACKUP_IN_PROGRESS);
1!
6012
                return -1;
1✔
6013
        }
6014
        struct gc_checkpoint *checkpoint;
6015
        gc_foreach_checkpoint_reverse(checkpoint) {
16✔
6016
                if (checkpoint_idx-- == 0)
13✔
6017
                        break;
11✔
6018
        }
6019
        if (checkpoint_idx >= 0) {
12✔
6020
                diag_set(ClientError, ER_MISSING_SNAPSHOT);
1!
6021
                return -1;
1✔
6022
        }
6023
        backup_gc = gc_ref_checkpoint(checkpoint, "backup");
11✔
6024
        int rc = engine_backup(&checkpoint->vclock, cb, cb_arg);
11✔
6025
        if (rc != 0) {
11!
6026
                gc_unref_checkpoint(backup_gc);
×
6027
                backup_gc = NULL;
×
6028
        }
6029
        return rc;
11✔
6030
}
6031

6032
void
6033
box_backup_stop(void)
11✔
6034
{
6035
        if (backup_gc != NULL) {
11✔
6036
                gc_unref_checkpoint(backup_gc);
10✔
6037
                backup_gc = NULL;
10✔
6038
        }
6039
}
11✔
6040

6041
const char *
6042
box_status(void)
34,813✔
6043
{
6044
    return status;
34,813✔
6045
}
6046

6047
static int
6048
box_reset_space_stat(struct space *space, void *arg)
782✔
6049
{
6050
        (void)arg;
6051
        for (uint32_t i = 0; i < space->index_count; i++)
2,420✔
6052
                index_reset_stat(space->index[i]);
1,638✔
6053
        return 0;
782✔
6054
}
6055

6056
void
6057
box_reset_stat(void)
28✔
6058
{
6059
        rmean_cleanup(rmean_box);
28✔
6060
        rmean_cleanup(rmean_error);
28✔
6061
        engine_reset_stat();
28✔
6062
        space_foreach(box_reset_space_stat, NULL);
28✔
6063
}
28✔
6064

6065
static void
6066
builtin_events_init(void)
6,384✔
6067
{
6068
        box_broadcast_fmt("box.id", "{}");
6,384✔
6069
        box_broadcast_fmt("box.schema", "{}");
6,384✔
6070
        box_broadcast_fmt("box.status", "{}");
6,384✔
6071
        box_broadcast_fmt("box.election", "{}");
6,384✔
6072
        box_broadcast_fmt("box.wal_error", "{}");
6,384✔
6073
        box_broadcast_fmt(box_ballot_event_key, "{}");
6,384✔
6074
        ev_timer_init(&box_broadcast_ballot_timer,
6,384✔
6075
                      box_broadcast_ballot_on_timeout, 0, 0);
6076
}
6,384✔
6077

6078
static void
6079
builtin_events_free(void)
6,054✔
6080
{
6081
        ev_timer_stop(loop(), &box_broadcast_ballot_timer);
6,054!
6082
}
6,054✔
6083

6084
void
6085
box_broadcast_id(void)
14,237✔
6086
{
6087
        char buf[1024];
6088
        char *w = buf;
14,237✔
6089
        w = mp_encode_map(w, 6);
14,237✔
6090
        w = mp_encode_str0(w, "id");
14,237✔
6091
        w = mp_encode_uint(w, instance_id);
14,237✔
6092
        w = mp_encode_str0(w, "instance_uuid");
14,237✔
6093
        w = mp_encode_uuid(w, &INSTANCE_UUID);
14,237!
6094
        w = mp_encode_str0(w, "instance_name");
14,237✔
6095
        if (*INSTANCE_NAME == 0)
14,237✔
6096
                w = mp_encode_nil(w);
12,846✔
6097
        else
6098
                w = mp_encode_str0(w, INSTANCE_NAME);
1,391✔
6099
        w = mp_encode_str0(w, "replicaset_uuid");
14,237✔
6100
        w = mp_encode_uuid(w, &REPLICASET_UUID);
14,237!
6101
        w = mp_encode_str0(w, "replicaset_name");
14,237✔
6102
        if (*REPLICASET_NAME == 0)
14,237✔
6103
                w = mp_encode_nil(w);
12,972✔
6104
        else
6105
                w = mp_encode_str0(w, REPLICASET_NAME);
1,265✔
6106
        w = mp_encode_str0(w, "cluster_name");
14,237✔
6107
        if (*CLUSTER_NAME == 0)
14,237✔
6108
                w = mp_encode_nil(w);
14,068✔
6109
        else
6110
                w = mp_encode_str0(w, CLUSTER_NAME);
169✔
6111

6112
        box_broadcast("box.id", strlen("box.id"), buf, w);
14,237!
6113

6114
        assert((size_t)(w - buf) < sizeof(buf));
14,237!
6115
}
14,237✔
6116

6117
void
6118
box_broadcast_status(void)
32,285✔
6119
{
6120
        char buf[1024];
6121
        char *w = buf;
32,285✔
6122
        w = mp_encode_map(w, 4);
32,285✔
6123
        w = mp_encode_str0(w, "is_ro");
32,285✔
6124
        w = mp_encode_bool(w, box_is_ro());
32,285✔
6125
        w = mp_encode_str0(w, "is_ro_cfg");
32,285✔
6126
        w = mp_encode_bool(w, cfg_geti("read_only"));
32,285!
6127
        w = mp_encode_str0(w, "status");
32,285✔
6128
        w = mp_encode_str0(w, box_status());
32,285!
6129
        w = mp_encode_str0(w, "dd_version");
32,285✔
6130
        w = mp_encode_str0(w, version_id_to_string(dd_version_id));
32,285✔
6131

6132
        box_broadcast("box.status", strlen("box.status"), buf, w);
32,285!
6133

6134
        assert((size_t)(w - buf) < sizeof(buf));
32,285!
6135
}
32,285✔
6136

6137
void
6138
box_broadcast_election(void)
12,441✔
6139
{
6140
        struct raft *raft = box_raft();
12,441✔
6141

6142
        char buf[1024];
6143
        char *w = buf;
12,441✔
6144
        w = mp_encode_map(w, 4);
12,441✔
6145
        w = mp_encode_str0(w, "term");
12,441✔
6146
        w = mp_encode_uint(w, raft->term);
12,441✔
6147
        w = mp_encode_str0(w, "role");
12,441✔
6148
        w = mp_encode_str0(w, raft_state_str(raft->state));
12,441!
6149
        w = mp_encode_str0(w, "is_ro");
12,441✔
6150
        w = mp_encode_bool(w, box_is_ro());
12,441✔
6151
        w = mp_encode_str0(w, "leader");
12,441✔
6152
        w = mp_encode_uint(w, raft->leader);
12,441✔
6153

6154
        box_broadcast("box.election", strlen("box.election"), buf, w);
12,441!
6155

6156
        assert((size_t)(w - buf) < sizeof(buf));
12,441!
6157
}
12,441✔
6158

6159
void
6160
box_broadcast_schema(void)
568,174✔
6161
{
6162
        char buf[1024];
6163
        char *w = buf;
568,174✔
6164
        w = mp_encode_map(w, 1);
568,174✔
6165
        w = mp_encode_str0(w, "version");
568,174✔
6166
        w = mp_encode_uint(w, box_schema_version());
568,174✔
6167

6168
        box_broadcast("box.schema", strlen("box.schema"), buf, w);
568,174!
6169

6170
        assert((size_t)(w - buf) < sizeof(buf));
568,174!
6171
}
568,174✔
6172

6173
void
6174
box_broadcast_ballot(void)
49,954✔
6175
{
6176
        char buf[2048];
6177
        char *w = buf;
49,954✔
6178
        struct ballot ballot;
6179
        box_process_vote(&ballot);
49,954!
6180
        w = mp_encode_ballot(w, &ballot);
49,954!
6181

6182
        box_broadcast(box_ballot_event_key, strlen(box_ballot_event_key), buf,
49,954!
6183
                      w);
6184

6185
        assert(mp_sizeof_ballot_max(&ballot) < sizeof(buf));
49,954!
6186
}
49,954✔
6187

6188
void
6189
box_read_ffi_disable(void)
2✔
6190
{
6191
        assert(box_read_ffi_is_disabled == (box_read_ffi_disable_count > 0));
2!
6192
        box_read_ffi_disable_count++;
2✔
6193
        box_read_ffi_is_disabled = true;
2✔
6194
}
2✔
6195

6196
void
6197
box_read_ffi_enable(void)
×
6198
{
6199
        assert(box_read_ffi_is_disabled);
×
6200
        assert(box_read_ffi_disable_count > 0);
×
6201
        if (--box_read_ffi_disable_count == 0)
×
6202
                box_read_ffi_is_disabled = false;
×
6203
}
×
6204

6205
int
6206
box_generate_space_id(uint32_t *new_space_id, bool is_temporary)
79,009✔
6207
{
6208
        assert(new_space_id != NULL);
79,009!
6209
        uint32_t id_range_begin = !is_temporary ?
79,009✔
6210
                BOX_SYSTEM_ID_MAX + 1 : BOX_SPACE_ID_TEMPORARY_MIN;
6211
        uint32_t id_range_end = !is_temporary ?
79,009✔
6212
                (uint32_t)BOX_SPACE_ID_TEMPORARY_MIN :
6213
                (uint32_t)BOX_SPACE_MAX + 1;
15✔
6214
        char key_buf[16];
6215
        char *key_end = key_buf;
79,009✔
6216
        key_end = mp_encode_array(key_end, 1);
79,009✔
6217
        key_end = mp_encode_uint(key_end, id_range_end);
79,009✔
6218
        struct credentials *orig_credentials = effective_user();
79,009!
6219
        fiber_set_user(fiber(), &admin_credentials);
79,009!
6220
        auto guard = make_scoped_guard([=] {
79,009✔
6221
                fiber_set_user(fiber(), orig_credentials);
79,009!
6222
        });
158,018!
6223
        box_iterator_t *it = box_index_iterator(BOX_SPACE_ID, 0, ITER_LT,
79,009!
6224
                                                key_buf, key_end);
6225
        if (it == NULL)
79,009✔
6226
                return -1;
5✔
6227
        struct tuple *res = NULL;
79,004✔
6228
        int rc = box_iterator_next(it, &res);
79,004!
6229
        box_iterator_free(it);
79,004!
6230
        if (rc != 0)
79,004!
6231
                return -1;
×
6232
        assert(res != NULL);
79,004!
6233
        uint32_t max_id = 0;
79,004✔
6234
        rc = tuple_field_u32(res, 0, &max_id);
79,004!
6235
        assert(rc == 0);
79,004!
6236
        if (max_id < id_range_begin)
79,004✔
6237
                max_id = id_range_begin - 1;
4,089✔
6238
        *new_space_id = space_cache_find_next_unused_id(max_id);
79,004!
6239
        /* Try again if overflowed. */
6240
        if (*new_space_id >= id_range_end) {
79,004✔
6241
                *new_space_id =
1✔
6242
                        space_cache_find_next_unused_id(id_range_begin - 1);
1!
6243
                /*
6244
                 * The second overflow means all ids are occupied.
6245
                 * This situation cannot happen in real world with limited
6246
                 * memory, and its pretty hard to test it, so let's just panic
6247
                 * if we've run out of ids.
6248
                 */
6249
                if (*new_space_id >= id_range_end)
1!
6250
                        panic("Space id limit is reached");
×
6251
        }
6252
        return 0;
79,004✔
6253
}
6254

6255
static void
6256
on_garbage_collection(void)
1,322✔
6257
{
6258
        box_broadcast_ballot();
1,322✔
6259
}
1,322✔
6260

6261
static void
6262
box_storage_init(void)
4,476✔
6263
{
6264
        assert(!is_storage_initialized);
4,476!
6265
        /* Join the cord interconnect as "tx" endpoint. */
6266
        fiber_pool_create(&tx_fiber_pool, "tx",
4,476✔
6267
                          IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
6268
                          box_fiber_pool_idle_timeout);
6269
        /* Add an extra endpoint for WAL wake up/rollback messages. */
6270
        cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb,
4,476✔
6271
                             &tx_prio_endpoint);
6272

6273
        rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
4,476✔
6274
        rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
4,476✔
6275

6276
        gc_init(on_garbage_collection);
4,476✔
6277
        engine_init();
4,476✔
6278
        schema_init();
4,475✔
6279
        replication_init(cfg_geti_default("replication_threads", 1));
4,475✔
6280
        iproto_init(cfg_geti("iproto_threads"));
4,474✔
6281
        sql_init();
4,474✔
6282
        audit_log_init();
4,474✔
6283
        security_cfg();
4,474✔
6284

6285
        int64_t wal_max_size = box_check_wal_max_size(
4,474✔
6286
                cfg_geti64("wal_max_size"));
6287
        enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
4,474✔
6288
        double wal_retention_period = box_check_wal_retention_period_xc();
4,474✔
6289
        if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
4,474✔
6290
                     wal_retention_period, &INSTANCE_UUID,
6291
                     &instance_vclock_storage, on_wal_garbage_collection,
6292
                     on_wal_checkpoint_threshold) != 0) {
4,474!
6293
                diag_raise();
×
6294
        }
6295
        is_storage_initialized = true;
4,474✔
6296
}
4,474✔
6297

6298
static void
6299
box_storage_free(void)
6,054✔
6300
{
6301
        if (!is_storage_initialized)
6,054✔
6302
                return;
1,718✔
6303
        iproto_free();
4,336✔
6304
        replication_free();
4,336✔
6305
        gc_free();
4,336✔
6306
        engine_free();
4,336✔
6307
        /* schema_free(); */
6308
        wal_free();
4,336✔
6309
        flightrec_free();
4,336✔
6310
        audit_log_free();
4,336✔
6311
        sql_built_in_functions_cache_free();
4,336✔
6312
        fiber_pool_destroy(&tx_fiber_pool);
4,336✔
6313
        is_storage_initialized = false;
4,336✔
6314
}
6315

6316
void
6317
box_init(void)
6,384✔
6318
{
6319
        iproto_constants_init();
6,384✔
6320
        iproto_features_init();
6,384✔
6321
        port_init();
6,384✔
6322
        box_on_recovery_state_event =
6,384✔
6323
                event_get("box.ctl.on_recovery_state", true);
6,384✔
6324
        event_ref(box_on_recovery_state_event);
6,384✔
6325
        box_on_shutdown_event = event_get("box.ctl.on_shutdown", true);
6,384✔
6326
        event_ref(box_on_shutdown_event);
6,384✔
6327
        tarantool_trigger_on_change_event =
6,384✔
6328
                event_get("tarantool.trigger.on_change", true);
6,384✔
6329
        event_ref(tarantool_trigger_on_change_event);
6,384✔
6330
        event_on_change(&box_trigger_on_change_trigger);
6,384✔
6331
        txn_event_trigger_init();
6,384✔
6332
        msgpack_init();
6,384✔
6333
        fiber_cond_create(&ro_cond);
6,384✔
6334
        auth_init();
6,384✔
6335
        security_init();
6,384✔
6336
        space_cache_init();
6,384✔
6337
        user_cache_init();
6,384✔
6338
        /*
6339
         * The order is important: to initialize sessions, we need to access the
6340
         * admin user, which is used as a default session user when running
6341
         * triggers.
6342
         */
6343
        session_init();
6,384✔
6344
        schema_module_init();
6,384✔
6345
        if (tuple_init(lua_hash) != 0)
6,384!
6346
                diag_raise();
×
6347
        txn_limbo_init();
6,384✔
6348
        sequence_init();
6,384✔
6349
        box_watcher_init();
6,384✔
6350
        box_raft_init();
6,384✔
6351
        wal_ext_init();
6,384✔
6352
        /*
6353
         * Default built-in events to help users distinguish an event being not
6354
         * supported from box.cfg not being called yet.
6355
         */
6356
        builtin_events_init();
6,384✔
6357
        crash_callback = box_crash_callback;
6,384✔
6358
        mempool_create(&sync_trigger_data_pool, &cord()->slabc,
6,384!
6359
                       sizeof(struct sync_trigger_data));
6360
}
6,384✔
6361

6362
void
6363
box_init_instance_vclock(const struct vclock *vclock)
731✔
6364
{
6365
        if (vclock_is_set(&instance_vclock_storage))
731!
6366
                panic("Instance vclock can be initialized only once");
×
6367
        vclock_copy(&instance_vclock_storage, vclock);
731✔
6368
}
731✔
6369

6370
/** Shutdown box storage i.e. stop parts that need TX loop running. */
6371
static void
6372
box_storage_shutdown()
6,054✔
6373
{
6374
        if (!is_storage_initialized)
6,054✔
6375
                return;
1,717✔
6376
        is_storage_shutdown = true;
4,337✔
6377
        if (iproto_shutdown(box_shutdown_timeout) != 0) {
4,337!
6378
                diag_log();
×
6379
                panic("cannot gracefully shutdown iproto");
×
6380
        }
6381
        replication_shutdown();
4,337✔
6382
        box_raft_shutdown();
4,337✔
6383
        txn_limbo_shutdown();
4,337✔
6384
        gc_shutdown();
4,337✔
6385
        engine_shutdown();
4,337✔
6386
        fiber_pool_shutdown(&tx_fiber_pool);
4,337✔
6387
}
6388

6389
void
6390
box_shutdown(void)
6,078✔
6391
{
6392
        /*
6393
         * Watcher should be shutdown before subsystems shutdown because
6394
         * it may execute client code. It can be shutdown before or
6395
         * after client fiber shutdown. Both cases are correct. But
6396
         * if we do it after we may have noisy log fiber creation failure
6397
         * for async watcher execution.
6398
         */
6399
        box_watcher_shutdown();
6,078✔
6400
        /*
6401
         * Iproto connections should be dropped before client fibers because
6402
         * they can produce new ones in `tx_fiber_pool`.
6403
         */
6404
        if (iproto_drop_connections(box_shutdown_timeout) != 0) {
6,078✔
6405
                diag_log();
1✔
6406
                panic("cannot gracefully shutdown iproto requests");
1!
6407
        }
6408
        /*
6409
         * Finish client fibers before other subsystems shutdown so that
6410
         * we won't get unexpected request to shutdown subsystems from
6411
         * client code.
6412
         */
6413
        if (fiber_shutdown(box_shutdown_timeout) != 0) {
6,077✔
6414
                diag_log();
1✔
6415
                panic("cannot gracefully shutdown client fibers");
1!
6416
        }
6417
        box_storage_shutdown();
6,054✔
6418
}
6,054✔
6419

6420
void
6421
box_free(void)
6,054✔
6422
{
6423
        /* References engines. */
6424
        space_cache_destroy();
6,054✔
6425
        /* References engine tuples. */
6426
        txn_limbo_free();
6,054✔
6427
        box_storage_free();
6,054✔
6428
        builtin_events_free();
6,054✔
6429
        security_free();
6,054✔
6430
        /* User auth references auth methods. */
6431
        user_cache_free();
6,054✔
6432
        auth_free();
6,054✔
6433
        wal_ext_free();
6,054✔
6434
        box_watcher_free();
6,054✔
6435
        box_raft_free();
6,054✔
6436
        sequence_free();
6,054✔
6437
        trigger_clear(&box_trigger_on_change_trigger);
6,054✔
6438
        event_unref(box_on_recovery_state_event);
6,054✔
6439
        box_on_recovery_state_event = NULL;
6,054✔
6440
        event_unref(tarantool_trigger_on_change_event);
6,054✔
6441
        tarantool_trigger_on_change_event = NULL;
6,054✔
6442
        txn_event_trigger_free();
6,054✔
6443
        tuple_free();
6,054✔
6444
        port_free();
6,054✔
6445
        iproto_constants_free();
6,054✔
6446
        mempool_destroy(&sync_trigger_data_pool);
6,054✔
6447
        box_lua_call_runtime_priv_reset();
6,054✔
6448
        /* schema_module_free(); */
6449
        /* session_free(); */
6450
}
6,054✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc