• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 12281896599

11 Dec 2024 05:41PM UTC coverage: 87.372% (+0.02%) from 87.355%
12281896599

Pull #10883

github

web-flow
Merge 1ffe9d3e4 into 4c302320c
Pull Request #10883: Suboptimal msgpuck support

69764 of 123664 branches covered (56.41%)

114 of 157 new or added lines in 5 files covered. (72.61%)

48 existing lines in 12 files now uncovered.

102736 of 117585 relevant lines covered (87.37%)

2941228.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.12
/src/box/relay.cc
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "relay.h"
32

33
#include "trivia/config.h"
34
#include "tt_static.h"
35
#include "scoped_guard.h"
36
#include "cbus.h"
37
#include "errinj.h"
38
#include "fiber.h"
39
#include "memory.h"
40
#include "say.h"
41

42
#include "coio.h"
43
#include "coio_task.h"
44
#include "engine.h"
45
#include "gc.h"
46
#include "iostream.h"
47
#include "iproto_constants.h"
48
#include "recovery.h"
49
#include "replication.h"
50
#include "trigger.h"
51
#include "vclock/vclock.h"
52
#include "version.h"
53
#include "xrow.h"
54
#include "xrow_io.h"
55
#include "xstream.h"
56
#include "wal.h"
57
#include "txn_limbo.h"
58
#include "raft.h"
59
#include "box.h"
60

61
/**
62
 * Cbus message to send status updates from relay to tx thread.
63
 */
64
struct relay_status_msg {
65
        /** Parent */
66
        struct cmsg msg;
67
        /** Relay instance */
68
        struct relay *relay;
69
        /** Replica's last known raft term. */
70
        uint64_t term;
71
        /** Replica vclock. */
72
        struct vclock vclock;
73
        /** Last replicated transaction timestamp. */
74
        double txn_lag;
75
        /** Last vclock sync received in replica's response. */
76
        uint64_t vclock_sync;
77
};
78

79
/**
80
 * Cbus message to update replica gc state in tx thread.
81
 */
82
struct relay_gc_msg {
83
        /** Parent */
84
        struct cmsg msg;
85
        /**
86
         * Link in the list of pending gc messages,
87
         * see relay::pending_gc.
88
         */
89
        struct stailq_entry in_pending;
90
        /** Relay instance */
91
        struct relay *relay;
92
        /** Vclock to advance to */
93
        struct vclock vclock;
94
};
95

96
/**
97
 * Cbus message to push raft messages to relay.
98
 */
99
struct relay_raft_msg {
100
        struct cmsg base;
101
        struct cmsg_hop route[2];
102
        struct raft_request req;
103
        struct vclock vclock;
104
        struct relay *relay;
105
};
106

107

108
/** State of a replication relay. */
109
struct relay {
110
        /** Replica connection */
111
        struct iostream *io;
112
        /** A stream to write rows to the remote peer. */
113
        struct xrow_stream xrow_stream;
114
        /** Request sync */
115
        uint64_t sync;
116
        /** Last ACK sent to the replica. */
117
        struct relay_heartbeat last_sent_ack;
118
        /** Last ACK received from the replica. */
119
        struct applier_heartbeat last_recv_ack;
120
        /** Recovery instance to read xlog from the disk */
121
        struct recovery *r;
122
        /** Xstream argument to recovery */
123
        struct xstream stream;
124
        /** A region used to save rows when collecting transactions. */
125
        struct lsregion lsregion;
126
        /** A monotonically growing identifier for lsregion allocations. */
127
        int64_t lsr_id;
128
        /** The tsn of the currently read transaction. */
129
        int64_t read_tsn;
130
        /** A list of rows making up the currently read transaction. */
131
        struct rlist current_tx;
132
        /** Vclock to stop playing xlogs */
133
        struct vclock stop_vclock;
134
        /** Remote replica */
135
        struct replica *replica;
136
        /** WAL event watcher. */
137
        struct wal_watcher wal_watcher;
138
        /** Relay reader cond. */
139
        struct fiber_cond reader_cond;
140
        /** Relay diagnostics. */
141
        struct diag diag;
142
        /** Replicatoin slave version. */
143
        uint32_t version_id;
144
        /**
145
         * The biggest Raft term that node has broadcasted. Used to synchronize
146
         * Raft term (from tx thread) and PROMOTE (from WAL) dispatch.
147
         */
148
        uint64_t sent_raft_term;
149
        /**
150
         * A filter of replica ids whose rows should be ignored.
151
         * Each set filter bit corresponds to a replica id whose
152
         * rows shouldn't be relayed. The list of ids to ignore
153
         * is passed by the replica on subscribe.
154
         */
155
        uint32_t id_filter;
156
        /**
157
         * Local vclock at the moment of subscribe, used to check
158
         * dataset on the other side and send missing data rows if any.
159
         */
160
        struct vclock local_vclock_at_subscribe;
161

162
        /** Endpoint to receive messages from WAL. */
163
        struct cbus_endpoint wal_endpoint;
164
        /**
165
         * Endpoint to receive messages from TX. Having the 2 endpoints
166
         * separated helps to synchronize the data coming from TX and WAL. Such
167
         * as term bumps from TX with PROMOTE rows from WAL.
168
         */
169
        struct cbus_endpoint tx_endpoint;
170
        /** A pipe from 'relay' thread to 'tx' */
171
        struct cpipe tx_pipe;
172
        /** A pipe from 'tx' thread to 'relay' */
173
        struct cpipe relay_pipe;
174
        /** Status message */
175
        struct relay_status_msg status_msg;
176
        /**
177
         * List of garbage collection messages awaiting
178
         * confirmation from the replica.
179
         */
180
        struct stailq pending_gc;
181
        /** Time when last row was sent to the peer. */
182
        double last_row_time;
183
        /** Time when last heartbeat was sent to the peer. */
184
        double last_heartbeat_time;
185
        /** Time of last communication with the tx thread. */
186
        double tx_seen_time;
187
        /**
188
         * A time difference between the moment when we
189
         * wrote a transaction to the local WAL and when
190
         * this transaction has been replicated to remote
191
         * node (ie written to node's WAL) so that ACK get
192
         * received.
193
         */
194
        double txn_lag;
195
        /** Relay sync state. */
196
        enum relay_state state;
197
        /** Whether relay should speed up the next heartbeat dispatch. */
198
        bool need_new_vclock_sync;
199
        struct {
200
                /* Align to prevent false-sharing with tx thread */
201
                alignas(CACHELINE_SIZE)
202
                /** Known relay vclock. */
203
                struct vclock vclock;
204
                /**
205
                 * Transaction downstream lag to be accessed
206
                 * from TX thread only.
207
                 */
208
                double txn_lag;
209
                /** Known vclock sync received in response from replica. */
210
                uint64_t vclock_sync;
211
                /**
212
                 * True if the relay is ready to accept messages via the cbus.
213
                 */
214
                bool is_paired;
215
                /**
216
                 * A pair of raft messages travelling between tx and relay
217
                 * threads. While one is en route, the other is ready to save
218
                 * the next incoming raft message.
219
                 */
220
                struct relay_raft_msg raft_msgs[2];
221
                /**
222
                 * Id of the raft message waiting in tx thread and ready to
223
                 * save Raft requests. May be either 0 or 1.
224
                 */
225
                int raft_ready_msg;
226
                /** Whether raft_ready_msg holds a saved Raft message */
227
                bool is_raft_push_pending;
228
                /**
229
                 * Whether any of the messages is en route between tx and
230
                 * relay.
231
                 */
232
                bool is_raft_push_sent;
233
        } tx;
234
        /**
235
         * The fiber handling the subscribe request: the corresponding relay can
236
         * be cancelled through it.
237
         */
238
        struct fiber *subscribe_fiber;
239
};
240

241
struct diag*
242
relay_get_diag(struct relay *relay)
232✔
243
{
244
        return &relay->diag;
232✔
245
}
246

247
enum relay_state
248
relay_get_state(const struct relay *relay)
41,647✔
249
{
250
        return relay->state;
41,647✔
251
}
252

253
const struct vclock *
254
relay_vclock(const struct relay *relay)
6,266✔
255
{
256
        return &relay->tx.vclock;
6,266✔
257
}
258

259
double
260
relay_last_row_time(const struct relay *relay)
6,272✔
261
{
262
        return relay->last_row_time;
6,272✔
263
}
264

265
double
266
relay_txn_lag(const struct relay *relay)
6,224✔
267
{
268
        return relay->tx.txn_lag;
6,224✔
269
}
270

271
static void
272
relay_send(struct relay *relay, struct xrow_header *packet);
273
static void
274
relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
275

276
/** One iteration of the subscription loop - bump heartbeats, TX endpoints. */
277
static void
278
relay_subscribe_update(struct relay *relay);
279

280
/** Process a single row from the WAL stream. */
281
static void
282
relay_process_row(struct xstream *stream, struct xrow_header *row);
283

284
struct relay *
285
relay_new(struct replica *replica)
14,261✔
286
{
287
        /*
288
         * We need to use aligned_alloc for this struct, because it's have
289
         * specific alignas(CACHELINE_SIZE). If we use simple malloc or same
290
         * functions, we will get member access within misaligned address
291
         * (Use clang UB Sanitizer, to make sure of this)
292
         */
293
        assert((sizeof(struct relay) % alignof(struct relay)) == 0);
294
        struct relay *relay = xalloc_object(struct relay);
14,261!
295
        memset(relay, 0, sizeof(struct relay));
14,261✔
296
        relay->replica = replica;
14,261✔
297
        relay->last_row_time = ev_monotonic_now(loop());
14,261!
298
        fiber_cond_create(&relay->reader_cond);
14,261✔
299
        diag_create(&relay->diag);
14,261✔
300
        stailq_create(&relay->pending_gc);
14,261✔
301
        relay->state = RELAY_OFF;
14,261✔
302
        return relay;
14,261✔
303
}
304

305
/** A callback recovery calls every now and then to unblock the event loop. */
306
static void
307
relay_yield(struct xstream *stream)
1,496✔
308
{
309
        (void) stream;
310
        fiber_sleep(0);
1,496✔
311
}
1,496✔
312

313
static void
314
relay_send_heartbeat_on_timeout(struct relay *relay);
315

316
/** A callback for recovery to send heartbeats while scanning a WAL. */
317
static void
318
relay_subscribe_on_wal_yield_f(struct xstream *stream)
66✔
319
{
320
        struct relay *relay = container_of(stream, struct relay, stream);
66✔
321
        relay_subscribe_update(relay);
66✔
322
        fiber_sleep(0);
66✔
323
}
66✔
324

325
static void
326
relay_start(struct relay *relay, struct iostream *io, uint64_t sync,
3,603✔
327
             void (*stream_write)(struct xstream *, struct xrow_header *),
328
             void (*stream_cb)(struct xstream *), uint64_t sent_raft_term)
329
{
330
        xstream_create(&relay->stream, stream_write, stream_cb);
3,603✔
331
        /*
332
         * Clear the diagnostics at start, in case it has the old
333
         * error message which we keep around to display in
334
         * box.info.replication.
335
         */
336
        diag_clear(&relay->diag);
3,603✔
337
        relay->io = io;
3,603✔
338
        relay->sync = sync;
3,603✔
339
        relay->state = RELAY_FOLLOW;
3,603✔
340
        relay->sent_raft_term = sent_raft_term;
3,603✔
341
        relay->need_new_vclock_sync = false;
3,603✔
342
        relay->last_row_time = ev_monotonic_now(loop());
3,603!
343
        relay->tx_seen_time = relay->last_row_time;
3,603✔
344
        relay->last_heartbeat_time = relay->last_row_time;
3,603✔
345
        /* Never send rows for REPLICA_ID_NIL to anyone */
346
        relay->id_filter = 1 << REPLICA_ID_NIL;
3,603✔
347
        memset(&relay->status_msg, 0, sizeof(relay->status_msg));
3,603✔
348
}
3,603✔
349

350
/**
351
 * Called by a relay thread right before termination.
352
 */
353
static void
354
relay_exit(struct relay *relay)
2,851✔
355
{
356
        struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE);
2,851!
357
        if (inj != NULL && inj->dparam > 0)
2,851!
358
                fiber_sleep(inj->dparam);
8✔
359

360
        xrow_stream_destroy(&relay->xrow_stream);
2,851✔
361
        /*
362
         * Destroy the recovery context. We MUST do it in
363
         * the relay thread, because it contains an xlog
364
         * cursor, which must be closed in the same thread
365
         * that opened it (it uses cord's slab allocator).
366
         */
367
        recovery_delete(relay->r);
2,851✔
368
        relay->r = NULL;
2,851✔
369
        lsregion_destroy(&relay->lsregion);
2,851✔
370
}
2,851✔
371

372
static void
373
relay_stop(struct relay *relay)
3,602✔
374
{
375
        struct relay_gc_msg *gc_msg, *next_gc_msg;
376
        stailq_foreach_entry_safe(gc_msg, next_gc_msg,
3,613✔
377
                                  &relay->pending_gc, in_pending) {
378
                free(gc_msg);
11✔
379
        }
380
        stailq_create(&relay->pending_gc);
3,602✔
381
        relay->io = NULL;
3,602✔
382
        if (relay->r != NULL)
3,602!
383
                recovery_delete(relay->r);
×
384
        relay->r = NULL;
3,602✔
385
        relay->state = RELAY_STOPPED;
3,602✔
386
        /*
387
         * If relay is stopped then lag statistics should
388
         * be updated on next new ACK packets obtained.
389
         */
390
        relay->txn_lag = 0;
3,602✔
391
        relay->tx.txn_lag = 0;
3,602✔
392
        relay->tx.vclock_sync = 0;
3,602✔
393
        relay->subscribe_fiber = NULL;
3,602✔
394
}
3,602✔
395

396
void
397
relay_delete(struct relay *relay)
14,083✔
398
{
399
        if (relay->state == RELAY_FOLLOW)
14,083!
400
                relay_stop(relay);
×
401
        fiber_cond_destroy(&relay->reader_cond);
14,083✔
402
        diag_destroy(&relay->diag);
14,083✔
403
        TRASH(relay);
14,083✔
404
        free(relay);
14,083✔
405
}
14,083✔
406

407
static void
408
relay_set_cord_name(int fd)
2,852✔
409
{
410
        char name[FIBER_NAME_MAX];
411
        struct sockaddr_storage peer;
412
        socklen_t addrlen = sizeof(peer);
2,852✔
413
        if (getpeername(fd, ((struct sockaddr*)&peer), &addrlen) == 0) {
2,852!
414
                snprintf(name, sizeof(name), "relay/%s",
2,852!
415
                         sio_strfaddr((struct sockaddr *)&peer, addrlen));
416
        } else {
417
                snprintf(name, sizeof(name), "relay/<unknown>");
×
418
        }
419
        cord_set_name(name);
2,852!
420
}
2,852✔
421

422
static void
423
relay_cord_init(struct relay *relay)
2,852✔
424
{
425
        coio_enable();
2,852✔
426
        relay_set_cord_name(relay->io->fd);
2,852✔
427
        lsregion_create(&relay->lsregion, &runtime);
2,852✔
428
        relay->lsr_id = 0;
2,852✔
429
        relay->read_tsn = 0;
2,852✔
430
        rlist_create(&relay->current_tx);
2,852✔
431
        xrow_stream_create(&relay->xrow_stream);
2,852✔
432
}
2,852✔
433

434
/** Flush any relay stream contents to the remote peer immediately. */
435
static inline int
436
relay_flush(struct relay *relay)
2,097,840✔
437
{
438
        if (xrow_stream_flush(&relay->xrow_stream, relay->io) < 0)
2,097,840✔
439
                return -1;
325✔
440
#ifndef NDEBUG
441
        struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
2,101,310!
442
        if (inj != NULL && inj->dparam > 0)
2,101,310!
443
                fiber_sleep(inj->dparam);
939✔
444
#endif
445
        return 0;
2,101,860✔
446
}
447

448
/** Check if relay stream has enough data to flush and flush it. */
449
static inline int
450
relay_check_flush(struct relay *relay)
352,332✔
451
{
452
        if (xrow_stream_check_flush(&relay->xrow_stream, relay->io) < 0)
352,332✔
453
                return -1;
2✔
454
#ifndef NDEBUG
455
        struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
352,323!
456
        if (inj != NULL && inj->dparam > 0)
352,323!
457
                fiber_sleep(inj->dparam);
1,474✔
458
#endif
459
        return 0;
352,336✔
460
}
461

462
void
463
relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
751✔
464
                   uint32_t replica_version_id,
465
                   struct checkpoint_cursor *cursor)
466
{
467
        struct relay *relay = relay_new(NULL);
751!
468
        relay_start(relay, io, sync, relay_send_initial_join_row, relay_yield,
751!
469
                    UINT64_MAX);
470
        xrow_stream_create(&relay->xrow_stream);
751✔
471
        relay->version_id = replica_version_id;
751✔
472
        auto relay_guard = make_scoped_guard([=] {
751✔
473
                xrow_stream_destroy(&relay->xrow_stream);
751✔
474
                relay_stop(relay);
751✔
475
                relay_delete(relay);
751✔
476
        });
1,502!
477

478
        /* Freeze a read view in engines. */
479
        struct engine_join_ctx ctx;
480
        memset(&ctx, 0, sizeof(ctx));
751✔
481
        /*
482
         * Version is present starting with 2.7.3, 2.8.2, 2.9.1
483
         * All these versions know of additional META stage of initial join.
484
         */
485
        ctx.send_meta = replica_version_id > 0;
751✔
486
        ctx.cursor = cursor;
751✔
487
        /*
488
         * If cursor is passed, we should respond with its vclock because
489
         * it will actually be vclock of a last sent row.
490
         */
491
        ctx.vclock = cursor != NULL ? cursor->vclock : vclock;
751✔
492
        engine_prepare_join_xc(&ctx);
751!
493
        auto join_guard = make_scoped_guard([&] {
751✔
494
                engine_complete_join(&ctx);
751✔
495
        });
1,502!
496

497
        /* Send read view to the replica. */
498
        engine_join_xc(&ctx, &relay->stream);
751✔
499
        if (relay_flush(relay) < 0)
747!
500
                diag_raise();
1✔
501
}
746✔
502

503
int
504
relay_final_join_f(va_list ap)
726✔
505
{
506
        struct relay *relay = va_arg(ap, struct relay *);
726✔
507
        auto guard = make_scoped_guard([=] { relay_exit(relay); });
1,452!
508

509
        relay_cord_init(relay);
726!
510

511
        /* Send all WALs until stop_vclock */
512
        assert(relay->stream.write != NULL);
726!
513
        recover_remaining_wals(relay->r, &relay->stream,
726✔
514
                               &relay->stop_vclock, true);
726!
515
        if (relay_flush(relay) < 0)
726!
516
                diag_raise();
×
517
        assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
726!
518
        return 0;
1,452✔
519
}
520

521
void
522
relay_final_join(struct replica *replica, struct iostream *io, uint64_t sync,
726✔
523
                 const struct vclock *start_vclock,
524
                 const struct vclock *stop_vclock)
525
{
526
        /*
527
         * As a new thread is started for the final join stage, its cancellation
528
         * should be handled properly during an unexpected shutdown, so, we
529
         * reuse the subscribe relay in order to cancel the final join thread
530
         * during replication_free().
531
         */
532
        struct relay *relay = replica->relay;
726✔
533
        assert(relay->state != RELAY_FOLLOW);
726!
534

535
        relay_start(relay, io, sync, relay_process_row, relay_yield,
726!
536
                    UINT64_MAX);
537
        auto relay_guard = make_scoped_guard([=] {
726✔
538
                relay_stop(relay);
726✔
539
        });
1,452!
540
        /*
541
         * Save the first vclock as 'received'. Because it was really received.
542
         */
543
        vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock);
726✔
544
        relay->r = recovery_new(wal_dir(), false, start_vclock);
726!
545
        vclock_copy(&relay->stop_vclock, stop_vclock);
726✔
546

547
        struct cord cord;
548
        int rc = cord_costart(&cord, "final_join", relay_final_join_f, relay);
726!
549
        if (rc == 0)
726!
550
                rc = cord_cojoin(&cord);
726!
551
        if (rc != 0)
726!
552
                diag_raise();
×
553

554
        ERROR_INJECT(ERRINJ_RELAY_FINAL_JOIN,
726!
555
                     tnt_raise(ClientError, ER_INJECTION, "relay final join"));
556

557
        ERROR_INJECT(ERRINJ_RELAY_FINAL_SLEEP, {
723!
558
                while (vclock_compare(stop_vclock, instance_vclock) == 0)
559
                        fiber_sleep(0.001);
560
        });
561
}
723✔
562

563
/** Check if status update is needed and send it if possible. */
564
static void
565
relay_check_status_needs_update(struct relay *relay);
566

567
/**
568
 * The message which updated tx thread with a new vclock has returned back
569
 * to the relay.
570
 */
571
static void
572
relay_status_update(struct cmsg *msg)
53,717✔
573
{
574
        msg->route = NULL;
53,717✔
575
        struct relay_status_msg *status = (struct relay_status_msg *)msg;
53,717✔
576
        struct relay *relay = status->relay;
53,717✔
577
        relay->tx_seen_time = ev_monotonic_now(loop());
53,717!
578
        relay_check_status_needs_update(relay);
53,707✔
579
}
53,634✔
580

581
/**
582
 * Deliver a fresh relay vclock to tx thread.
583
 */
584
static void
585
tx_status_update(struct cmsg *msg)
53,775✔
586
{
587
        struct relay_status_msg *status = (struct relay_status_msg *)msg;
53,775✔
588
        struct relay *relay = status->relay;
53,775✔
589
        vclock_copy(&relay->tx.vclock, &status->vclock);
53,775✔
590
        relay->tx.txn_lag = status->txn_lag;
53,775✔
591
        relay->tx.vclock_sync = status->vclock_sync;
53,775✔
592

593
        struct replication_ack ack;
594
        ack.source = status->relay->replica->id;
53,775✔
595
        ack.vclock = &status->vclock;
53,775✔
596
        ack.vclock_sync = status->vclock_sync;
53,775✔
597
        bool anon = status->relay->replica->anon;
53,775✔
598
        /*
599
         * It is important to process the term first and freeze the limbo before
600
         * an ACK if the term was bumped. This is because majority of the
601
         * cluster might be already living in a new term and this ACK is coming
602
         * from one of such nodes. It means that the row was written on the
603
         * replica but can't CONFIRM/ROLLBACK - the old term has ended, new one
604
         * has no result yet, need a PROMOTE.
605
         */
606
        raft_process_term(box_raft(), status->term, ack.source);
53,775!
607
        /*
608
         * Let pending synchronous transactions know, which of
609
         * them were successfully sent to the replica. Acks are
610
         * collected only by the transactions originator (which is
611
         * the single master in 100% so far). Other instances wait
612
         * for master's CONFIRM message instead.
613
         */
614
        if (txn_limbo_is_owned_by_current_instance(&txn_limbo) && !anon) {
53,775✔
615
                txn_limbo_ack(&txn_limbo, ack.source,
5,953!
616
                              vclock_get(ack.vclock, instance_id));
617
        }
618
        trigger_run(&replicaset.on_ack, &ack);
53,775!
619

620
        static const struct cmsg_hop route[] = {
621
                {relay_status_update, NULL}
622
        };
623
        cmsg_init(msg, route);
53,775✔
624
        cpipe_push(&status->relay->relay_pipe, msg);
53,775!
625
}
53,775✔
626

627
/**
628
 * Update replica gc state in tx thread.
629
 */
630
static void
631
tx_gc_advance(struct cmsg *msg)
100✔
632
{
633
        struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
100✔
634
        gc_consumer_advance(m->relay->replica->gc, &m->vclock);
100✔
635
        free(m);
100✔
636
}
100✔
637

638
static int
639
relay_on_close_log_f(struct trigger *trigger, void * /* event */)
134✔
640
{
641
        static const struct cmsg_hop route[] = {
642
                {tx_gc_advance, NULL}
643
        };
644
        struct relay *relay = (struct relay *)trigger->data;
134✔
645
        struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
134✔
646
        if (m == NULL) {
134!
647
                say_warn("failed to allocate relay gc message");
×
648
                return 0;
×
649
        }
650
        cmsg_init(&m->msg, route);
134✔
651
        m->relay = relay;
134✔
652
        vclock_copy(&m->vclock, &relay->r->vclock);
134✔
653
        /*
654
         * Do not invoke garbage collection until the replica
655
         * confirms that it has received data stored in the
656
         * sent xlog.
657
         */
658
        stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
134✔
659
        return 0;
134✔
660
}
661

662
/**
663
 * Invoke pending garbage collection requests.
664
 *
665
 * This function schedules the most recent gc message whose
666
 * vclock is less than or equal to the given one. Older
667
 * messages are discarded as their job will be done by the
668
 * scheduled message anyway.
669
 */
670
static inline void
671
relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
2,008,900✔
672
{
673
        struct relay_gc_msg *curr, *next, *gc_msg = NULL;
2,008,900✔
674
        stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
2,009,090✔
675
                /*
676
                 * We may delete a WAL file only if its vclock is
677
                 * less than or equal to the vclock acknowledged by
678
                 * the replica. Even if the replica's signature is
679
                 * is greater, but the vclocks are incomparable, we
680
                 * must not delete the WAL, because there may still
681
                 * be rows not applied by the replica in it while
682
                 * the greater signatures is due to changes pulled
683
                 * from other members of the cluster.
684
                 */
685
                if (vclock_compare_ignore0(&curr->vclock, vclock) > 0)
7,954✔
686
                        break;
7,831✔
687
                stailq_shift(&relay->pending_gc);
123✔
688
                free(gc_msg);
198✔
689
                gc_msg = curr;
198✔
690
        }
691
        if (gc_msg != NULL)
2,008,720✔
692
                cpipe_push(&relay->tx_pipe, &gc_msg->msg);
100✔
693
}
2,008,720✔
694

695
static void
696
relay_set_error(struct relay *relay, struct error *e)
2,495✔
697
{
698
        /* Don't override existing error. */
699
        if (diag_is_empty(&relay->diag))
2,495✔
700
                diag_set_error(&relay->diag, e);
2,124✔
701
}
2,494✔
702

703
static void
704
relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
28,991✔
705
{
706
        struct relay *relay = container_of(watcher, struct relay, wal_watcher);
28,991✔
707
        if (fiber_is_cancelled()) {
28,991✔
708
                /*
709
                 * The relay is exiting. Rescanning the WAL at this
710
                 * point would be pointless and even dangerous,
711
                 * because the relay could have written a packet
712
                 * fragment to the socket before being cancelled
713
                 * so that writing another row to the socket would
714
                 * lead to corrupted replication stream and, as
715
                 * a result, permanent replication breakdown.
716
                 */
717
                return;
33✔
718
        }
719
        try {
720
                recover_remaining_wals(relay->r, &relay->stream, NULL,
28,944✔
721
                                       (events & WAL_EVENT_ROTATE) != 0);
28,944✔
722
        } catch (Exception *e) {
94✔
723
                relay_set_error(relay, e);
47!
724
                fiber_cancel(fiber());
47!
725
        }
726
}
727

728
/** Process the last received ACK from applier. */
729
static void
730
relay_process_ack(struct relay *relay, double tm)
45,969✔
731
{
732
        if (tm == 0)
45,969✔
733
                return;
34,772✔
734
        /*
735
         * Replica sends us last replicated transaction timestamp which is
736
         * needed for relay lag monitoring. Note that this transaction has been
737
         * written to WAL with our current realtime clock value, thus when it
738
         * get reported back we can compute time spent regardless of the clock
739
         * value on remote replica. Update the lag only when the timestamp
740
         * corresponds to some transaction the replica has just applied, i.e.
741
         * received vclock is bigger than the previous one.
742
         */
743
        const struct vclock *prev_vclock = &relay->status_msg.vclock;
11,197✔
744
        const struct vclock *next_vclock = &relay->last_recv_ack.vclock;
11,197✔
745
        /*
746
         * Both vclocks are confirmed by the same applier, sequentially. They
747
         * can't go down.
748
         */
749
        assert(vclock_compare(prev_vclock, next_vclock) <= 0);
11,197!
750
        if (vclock_compare_ignore0(prev_vclock, next_vclock) < 0)
11,201✔
751
                relay->txn_lag = ev_now(loop()) - tm;
10,974!
752
}
753

754
/*
755
 * Relay reader fiber function.
756
 * Read xrow encoded vclocks sent by the replica.
757
 */
758
int
759
relay_reader_f(va_list ap)
2,126✔
760
{
761
        struct relay *relay = va_arg(ap, struct relay *);
2,126✔
762
        struct fiber *relay_f = va_arg(ap, struct fiber *);
2,126✔
763

764
        struct ibuf ibuf;
765
        ibuf_create(&ibuf, &cord()->slabc, 1024);
2,126!
766
        struct applier_heartbeat *last_recv_ack = &relay->last_recv_ack;
2,126✔
767
        try {
768
                while (!fiber_is_cancelled()) {
48,108!
769
                        FiberGCChecker gc_check;
48,101✔
770
                        struct xrow_header xrow;
771
                        ERROR_INJECT_YIELD(ERRINJ_RELAY_READ_ACK_DELAY);
48,286!
772
                        coio_read_xrow_timeout_xc(relay->io, &ibuf, &xrow,
48,107!
773
                                        replication_disconnect_timeout());
774
                        xrow_decode_applier_heartbeat_xc(&xrow, last_recv_ack);
45,991!
775
                        relay_process_ack(relay, xrow.tm);
45,969!
776
                        fiber_cond_signal(&relay->reader_cond);
45,974!
777
                }
778
        } catch (Exception *e) {
4,246✔
779
                relay_set_error(relay, e);
2,124!
780
                fiber_cancel(relay_f);
2,123✔
781
        }
782
        ibuf_destroy(&ibuf);
2,124!
783
        return 0;
2,124✔
784
}
785

786
/**
787
 * Send a heartbeat message over a connected relay.
788
 */
789
static inline void
790
relay_send_heartbeat(struct relay *relay)
24,999✔
791
{
792
        struct xrow_header row;
793
        ++relay->last_sent_ack.vclock_sync;
24,999✔
794
        RegionGuard region_guard(&fiber()->gc);
24,999!
795
        xrow_encode_relay_heartbeat(&row, &relay->last_sent_ack);
25,009!
796
        /*
797
         * Do not encode timestamp if this heartbeat is sent in between
798
         * data rows so as to not affect replica's upstream lag.
799
         */
800
        if (relay->last_row_time > relay->last_heartbeat_time)
24,982✔
801
                row.tm = 0;
3,913✔
802
        else
803
                row.tm = ev_now(loop());
21,069!
804
        row.replica_id = instance_id;
24,982✔
805
        relay->last_heartbeat_time = ev_monotonic_now(loop());
24,982!
806
        relay_send(relay, &row);
25,001!
807
        if (relay_flush(relay) < 0) {
24,972!
808
                relay_set_error(relay, diag_last_error(diag_get()));
202!
809
                fiber_cancel(fiber());
202!
810
        }
811
        relay->need_new_vclock_sync = false;
25,008✔
812
}
25,002✔
813

814
/**
815
 * Check whether a new heartbeat message should be sent and send it
816
 * in case it's required.
817
 */
818
static inline void
819
relay_send_heartbeat_on_timeout(struct relay *relay)
2,076,220✔
820
{
821
        double now = ev_monotonic_now(loop());
2,076,220!
822
        /*
823
         * Do not send a message when it was just sent or when tx thread is
824
         * unresponsive.
825
         * Waiting for a replication_disconnect_timeout before declaring tx
826
         * thread unresponsive helps fight leader disruptions: followers start
827
         * counting down replication_disconnect_timeout only when the same
828
         * timeout already passes on the leader, meaning tx thread hang will be
829
         * noticed twice as late compared to a usual failure, like a crash or
830
         * network error. IOW transient hangs are tolerated without leader
831
         * switchover.
832
         */
833
        if (!relay->need_new_vclock_sync &&
4,151,760✔
834
            (now - relay->last_heartbeat_time <= replication_timeout ||
2,075,830✔
835
            now - relay->tx_seen_time >= replication_disconnect_timeout()))
104,966✔
836
                return;
2,053,000✔
837
        relay_send_heartbeat(relay);
22,925✔
838
}
839

840
static void
841
relay_push_raft_msg(struct relay *relay)
5,069✔
842
{
843
        bool is_raft_enabled = relay->tx.is_paired && !relay->replica->anon &&
7,016✔
844
                               relay->version_id >= version_id(2, 6, 0);
1,947✔
845
        if (!is_raft_enabled || relay->tx.is_raft_push_sent)
5,069✔
846
                return;
3,221✔
847
        struct relay_raft_msg *msg =
1,848✔
848
                &relay->tx.raft_msgs[relay->tx.raft_ready_msg];
1,848✔
849
        cpipe_push(&relay->relay_pipe, &msg->base);
1,848✔
850
        relay->tx.raft_ready_msg = (relay->tx.raft_ready_msg + 1) % 2;
1,848✔
851
        relay->tx.is_raft_push_sent = true;
1,848✔
852
        relay->tx.is_raft_push_pending = false;
1,848✔
853
}
854

855
/** A notification that relay thread is ready to process cbus messages. */
856
static void
857
relay_thread_on_start(void *arg)
2,126✔
858
{
859
        struct relay *relay = (struct relay *)arg;
2,126✔
860
        relay->tx.is_paired = true;
2,126✔
861
        if (!relay->replica->anon && relay->version_id >= version_id(2, 6, 0)) {
2,126✔
862
                /*
863
                 * Send saved raft message as soon as relay becomes operational.
864
                 */
865
                if (relay->tx.is_raft_push_pending)
2,059✔
866
                        relay_push_raft_msg(relay);
492✔
867
        }
868
        trigger_run(&replicaset.on_relay_thread_start, relay->replica);
2,126✔
869
}
2,126✔
870

871
/** A notification about relay detach from the cbus. */
872
static void
873
relay_thread_on_stop(void *arg)
2,125✔
874
{
875
        struct relay *relay = (struct relay *)arg;
2,125✔
876
        relay->tx.is_paired = false;
2,125✔
877
}
2,125✔
878

879
/** The trigger_vclock_sync call message. */
880
struct relay_trigger_vclock_sync_msg {
881
        /** Parent cbus message. */
882
        struct cbus_call_msg base;
883
        /** The queried relay. */
884
        struct relay *relay;
885
        /** Sync value returned from relay. */
886
        uint64_t vclock_sync;
887
};
888

889
/** A callback to free the message once it returns to tx thread. */
890
static int
891
relay_trigger_vclock_sync_msg_free(struct cbus_call_msg *msg)
1✔
892
{
893
        free(msg);
1✔
894
        return 0;
1✔
895
}
896

897
/** Relay side of the trigger_vclock_sync call. */
898
static int
899
relay_trigger_vclock_sync_f(struct cbus_call_msg *msg)
12✔
900
{
901
        struct relay_trigger_vclock_sync_msg *m =
12✔
902
                (struct relay_trigger_vclock_sync_msg *)msg;
903
        m->vclock_sync = m->relay->last_sent_ack.vclock_sync + 1;
12✔
904
        m->relay->need_new_vclock_sync = true;
12✔
905
        return 0;
12✔
906
}
907

908
int
909
relay_trigger_vclock_sync(struct relay *relay, uint64_t *vclock_sync,
12✔
910
                          double deadline)
911
{
912
        if (!relay->tx.is_paired)
12!
913
                return 0;
×
914
        struct relay_trigger_vclock_sync_msg *msg =
915
                (struct relay_trigger_vclock_sync_msg *)xmalloc(sizeof(*msg));
12!
916
        msg->relay = relay;
12✔
917
        double timeout = deadline - ev_monotonic_now(loop());
12!
918
        if (cbus_call_timeout(&relay->relay_pipe, &relay->tx_pipe, &msg->base,
12✔
919
                              relay_trigger_vclock_sync_f,
920
                              relay_trigger_vclock_sync_msg_free, timeout) < 0)
12✔
921
                return -1;
1✔
922
        *vclock_sync = msg->vclock_sync;
11✔
923
        free(msg);
11✔
924
        return 0;
11✔
925
}
926

927
static void
928
relay_check_status_needs_update(struct relay *relay)
2,128,400✔
929
{
930
        struct applier_heartbeat *last_recv_ack = &relay->last_recv_ack;
2,128,400✔
931
        struct relay_status_msg *status_msg = &relay->status_msg;
2,128,400✔
932
        if (status_msg->msg.route != NULL)
2,128,400✔
933
                return;
120,152✔
934

935
        struct vclock *send_vclock;
936
        if (relay->version_id < version_id(1, 7, 4))
2,008,250✔
937
                send_vclock = &relay->r->vclock;
55✔
938
        else
939
                send_vclock = &last_recv_ack->vclock;
2,008,870✔
940

941
        /* Collect xlog files received by the replica. */
942
        relay_schedule_pending_gc(relay, send_vclock);
2,008,920✔
943

944
        double tx_idle = ev_monotonic_now(loop()) - relay->tx_seen_time;
2,008,610!
945
        if (vclock_sum(&status_msg->vclock) ==
2,008,120✔
946
            vclock_sum(send_vclock) && tx_idle <= replication_timeout &&
3,982,710✔
947
            status_msg->vclock_sync == last_recv_ack->vclock_sync)
1,974,790✔
948
                return;
1,953,460✔
949
        static const struct cmsg_hop route[] = {
950
                {tx_status_update, NULL}
951
        };
952
        cmsg_init(&status_msg->msg, route);
53,665✔
953
        vclock_copy(&status_msg->vclock, send_vclock);
53,684✔
954
        status_msg->txn_lag = relay->txn_lag;
53,688✔
955
        status_msg->relay = relay;
53,688✔
956
        status_msg->term = last_recv_ack->term;
53,688✔
957
        status_msg->vclock_sync = last_recv_ack->vclock_sync;
53,688✔
958
        cpipe_push(&relay->tx_pipe, &status_msg->msg);
53,688✔
959
}
960

961
static void
962
relay_subscribe_update(struct relay *relay)
2,076,480✔
963
{
964
        /*
965
         * The fiber can be woken by IO cancel, by a timeout of status messaging
966
         * or by an acknowledge to status message. Handle cbus messages first.
967
         */
968
        struct errinj *inj = errinj(ERRINJ_RELAY_FROM_TX_DELAY, ERRINJ_BOOL);
2,076,480!
969
        if (inj == NULL || !inj->bparam)
2,076,480!
970
                cbus_process(&relay->tx_endpoint);
2,076,470✔
971
        relay_send_heartbeat_on_timeout(relay);
2,076,340✔
972
        relay_check_status_needs_update(relay);
2,075,830✔
973
}
2,073,620✔
974

975
/**
976
 * A libev callback invoked when a relay client socket is ready
977
 * for read. This currently only happens when the client closes
978
 * its socket, and we get an EOF.
979
 */
980
static int
981
relay_subscribe_f(va_list ap)
2,126✔
982
{
983
        struct relay *relay = va_arg(ap, struct relay *);
2,126✔
984

985
        relay_cord_init(relay);
2,126!
986

987
        cbus_endpoint_create(&relay->tx_endpoint,
2,126!
988
                             tt_sprintf("relay_tx_%p", relay),
989
                             fiber_schedule_cb, fiber());
2,126!
990
        cbus_pair("tx", relay->tx_endpoint.name, &relay->tx_pipe,
2,126!
991
                  &relay->relay_pipe, relay_thread_on_start, relay,
992
                  cbus_process);
993

994
        cbus_endpoint_create(&relay->wal_endpoint,
2,126!
995
                             tt_sprintf("relay_wal_%p", relay),
996
                             fiber_schedule_cb, fiber());
2,126!
997

998
        /*
999
         * Setup garbage collection trigger.
1000
         * Not needed for anonymous replicas, since they
1001
         * aren't registered with gc at all.
1002
         */
1003
        struct trigger on_close_log;
1004
        trigger_create(&on_close_log, relay_on_close_log_f, relay, NULL);
2,126✔
1005
        trigger_add(&relay->r->on_close_log, &on_close_log);
2,126✔
1006

1007
        /* Setup WAL watcher for sending new rows to the replica. */
1008
        struct errinj *inj = errinj(ERRINJ_RELAY_WAL_START_DELAY, ERRINJ_BOOL);
2,126!
1009
        while (inj != NULL && inj->bparam) {
2,192!
1010
                fiber_sleep(0.01);
66✔
1011
                xstream_yield(&relay->stream);
66!
1012
        }
1013
        wal_set_watcher(&relay->wal_watcher, relay->wal_endpoint.name,
2,126!
1014
                        relay_process_wal_event, cbus_process);
1015

1016
        /* Start fiber for receiving replica acks. */
1017
        char name[FIBER_NAME_MAX];
1018
        snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
2,126!
1019
        struct fiber *reader = fiber_new_xc(name, relay_reader_f);
2,126!
1020
        fiber_set_joinable(reader, true);
2,126✔
1021
        fiber_start(reader, relay, fiber());
2,126!
1022

1023
        /*
1024
         * If the replica happens to be up to date on subscribe,
1025
         * don't wait for timeout to happen - send a heartbeat
1026
         * message right away to update the replication lag as
1027
         * soon as possible.
1028
         */
1029
        relay_send_heartbeat(relay);
2,126!
1030

1031
        /*
1032
         * Run the event loop until the connection is broken
1033
         * or an error occurs.
1034
         */
1035
        inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, ERRINJ_DOUBLE);
2,126!
1036
        while (!fiber_is_cancelled()) {
2,075,820✔
1037
                FiberGCChecker gc_check;
2,073,870✔
1038
                double timeout = replication_timeout;
2,073,700✔
1039
                if (inj != NULL && inj->dparam != 0)
2,073,700✔
1040
                        timeout = inj->dparam;
1,854,920✔
1041

1042
                fiber_cond_wait_deadline(&relay->reader_cond,
2,073,700✔
1043
                                         relay->last_row_time + timeout);
2,073,700!
1044
                cbus_process(&relay->wal_endpoint);
2,074,050!
1045
                relay_subscribe_update(relay);
2,076,480!
1046
                if (relay_flush(relay) < 0) {
2,073,500!
1047
                        relay_set_error(relay, diag_last_error(diag_get()));
112!
1048
                        fiber_cancel(fiber());
112!
1049
                }
1050
        }
1051

1052
        /*
1053
         * Clear garbage collector trigger and WAL watcher.
1054
         * trigger_clear() does nothing in case the triggers
1055
         * aren't set (the replica is anonymous).
1056
         */
1057
        trigger_clear(&on_close_log);
2,122!
1058
        wal_clear_watcher(&relay->wal_watcher, cbus_process);
2,123!
1059

1060
        /* Join ack reader fiber. */
1061
        fiber_cancel(reader);
2,125✔
1062
        fiber_join(reader);
2,125✔
1063

1064
        /* Destroy cpipe to tx. */
1065
        cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
2,124!
1066
                    relay_thread_on_stop, relay, cbus_process);
1067
        cbus_endpoint_destroy(&relay->wal_endpoint, cbus_process);
2,125!
1068
        cbus_endpoint_destroy(&relay->tx_endpoint, cbus_process);
2,124!
1069

1070
        relay_exit(relay);
2,125!
1071

1072
        /*
1073
         * Log the error that caused the relay to break the loop.
1074
         * Don't clear the error for status reporting.
1075
         */
1076
        assert(!diag_is_empty(&relay->diag));
2,125!
1077
        diag_set_error(diag_get(), diag_last_error(&relay->diag));
2,125!
1078
        diag_log();
2,124!
1079
        say_info("exiting the relay loop");
2,125!
1080

1081
        return -1;
2,124✔
1082
}
1083

1084
/** Replication acceptor fiber handler. */
1085
void
1086
relay_subscribe(struct replica *replica, struct iostream *io, uint64_t sync,
2,126✔
1087
                const struct vclock *start_vclock, uint32_t replica_version_id,
1088
                uint32_t replica_id_filter, uint64_t sent_raft_term)
1089
{
1090
        assert(replica->anon || replica->id != REPLICA_ID_NIL);
2,126!
1091
        struct relay *relay = replica->relay;
2,126✔
1092
        assert(relay->state != RELAY_FOLLOW);
2,126!
1093
        if (replica_version_id < version_id(2, 6, 0) || replica->anon)
2,126✔
1094
                sent_raft_term = UINT64_MAX;
67✔
1095
        relay_start(relay, io, sync, relay_process_row,
2,126!
1096
                    relay_subscribe_on_wal_yield_f, sent_raft_term);
1097
        replica_on_relay_follow(replica);
2,126!
1098
        auto relay_guard = make_scoped_guard([=] {
2,125✔
1099
                relay_stop(relay);
2,125✔
1100
                replica_on_relay_stop(replica);
2,125✔
1101
        });
4,251!
1102

1103
        vclock_copy(&relay->local_vclock_at_subscribe, instance_vclock);
2,126✔
1104
        /*
1105
         * Save the first vclock as 'received'. Because it was really received.
1106
         */
1107
        vclock_copy_ignore0(&relay->last_recv_ack.vclock, start_vclock);
2,126✔
1108
        relay->r = recovery_new(wal_dir(), false, start_vclock);
2,126!
1109
        vclock_copy_ignore0(&relay->tx.vclock, start_vclock);
2,126✔
1110
        relay->version_id = replica_version_id;
2,126✔
1111
        relay->id_filter |= replica_id_filter;
2,126✔
1112
        relay->subscribe_fiber = fiber();
2,126!
1113

1114
        struct cord cord;
1115
        int rc = cord_costart(&cord, "subscribe", relay_subscribe_f, relay);
2,126!
1116
        if (rc == 0)
2,126!
1117
                rc = cord_cojoin(&cord);
2,126!
1118
        if (rc != 0)
2,125!
1119
                diag_raise();
2,125✔
1120
}
×
1121

1122
static void
1123
relay_send(struct relay *relay, struct xrow_header *packet)
401,890✔
1124
{
1125
        ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
401,887!
1126

1127
        struct xrow_stream *stream = &relay->xrow_stream;
400,787✔
1128
        packet->sync = relay->sync;
400,787✔
1129
        relay->last_row_time = ev_monotonic_now(loop());
400,787!
1130
        xrow_stream_write(stream, packet);
400,784✔
1131
}
400,748✔
1132

1133
void
1134
relay_filter_raft(struct xrow_header *packet, uint32_t version)
4,642✔
1135
{
1136
        assert(iproto_type_is_raft_request(packet->type));
4,642!
1137
        if (version > version_id(3, 2, 1) ||
4,647✔
1138
            (version > version_id(2, 11, 5) && version < version_id(3, 0, 0)))
5✔
1139
                return;
4,640✔
1140
        /**
1141
         * Until Tarantool 3.2.2 all raft requests were sent with GROUP_LOCAL
1142
         * id. In order not to break the upgrade process, raft rows are still
1143
         * sent as local to old replicas. This was also backported to 2.11.6.
1144
         */
1145
        packet->group_id = GROUP_LOCAL;
2✔
1146
}
1147

1148
static void
1149
relay_send_raft(struct relay *relay, struct xrow_header *packet)
2,582✔
1150
{
1151
        assert(iproto_type_is_raft_request(packet->type));
2,582!
1152
        relay_filter_raft(packet, relay->version_id);
2,582✔
1153
        relay_send(relay, packet);
2,581✔
1154
}
2,582✔
1155

1156
static void
1157
relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
325,296✔
1158
{
1159
        struct relay *relay = container_of(stream, struct relay, stream);
325,296✔
1160
        if (iproto_type_is_raft_request(row->type))
325,296✔
1161
                return relay_send_raft(relay, row);
736✔
1162
        /*
1163
         * Ignore replica local requests as we don't need to promote
1164
         * vclock while sending a snapshot.
1165
         */
1166
        if (row->group_id != GROUP_LOCAL)
324,556!
1167
                relay_send(relay, row);
324,557✔
1168
        if (relay_check_flush(relay) < 0)
324,550✔
1169
                diag_raise();
1✔
1170
}
1171

1172
/**
1173
 * Send a Raft message to the peer. This is done asynchronously, out of scope
1174
 * of recover_remaining_wals loop.
1175
 */
1176
static void
1177
relay_raft_msg_push(struct cmsg *base)
1,848✔
1178
{
1179
        struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
1,848✔
1180
        struct relay *relay = msg->relay;
1,848✔
1181
        struct xrow_header row;
1182
        RegionGuard region_guard(&fiber()->gc);
1,848!
1183
        xrow_encode_raft(&row, &fiber()->gc, &msg->req);
1,847!
1184
        relay_send_raft(relay, &row);
1,846!
1185
        if (relay_flush(relay) < 0) {
1,846!
1186
                relay_set_error(relay, diag_last_error(diag_get()));
10!
1187
                fiber_cancel(fiber());
10!
1188
        }
1189
        relay->sent_raft_term = msg->req.term;
1,848✔
1190
}
1,847✔
1191

1192
static void
1193
tx_raft_msg_return(struct cmsg *base)
1,847✔
1194
{
1195
        struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
1,847✔
1196
        msg->relay->tx.is_raft_push_sent = false;
1,847✔
1197
        if (msg->relay->tx.is_raft_push_pending)
1,847✔
1198
                relay_push_raft_msg(msg->relay);
95✔
1199
}
1,847✔
1200

1201
void
1202
relay_push_raft(struct relay *relay, const struct raft_request *req)
4,482✔
1203
{
1204
        struct relay_raft_msg *msg =
4,482✔
1205
                &relay->tx.raft_msgs[relay->tx.raft_ready_msg];
4,482✔
1206
        /*
1207
         * Overwrite the request in raft_ready_msg. Only the latest raft request
1208
         * is saved.
1209
         */
1210
        msg->req = *req;
4,482✔
1211
        if (req->vclock != NULL) {
4,482✔
1212
                msg->req.vclock = &msg->vclock;
275✔
1213
                vclock_copy(&msg->vclock, req->vclock);
275✔
1214
        }
1215
        msg->route[0].f = relay_raft_msg_push;
4,482✔
1216
        msg->route[0].pipe = &relay->tx_pipe;
4,482✔
1217
        msg->route[1].f = tx_raft_msg_return;
4,482✔
1218
        msg->route[1].pipe = NULL;
4,482✔
1219
        cmsg_init(&msg->base, msg->route);
4,482✔
1220
        msg->relay = relay;
4,482✔
1221
        relay->tx.is_raft_push_pending = true;
4,482✔
1222
        relay_push_raft_msg(relay);
4,482✔
1223
}
4,482✔
1224

1225
void
1226
relay_cancel(struct relay *relay)
1,035✔
1227
{
1228
        if (relay->subscribe_fiber != NULL)
1,035✔
1229
                fiber_cancel(relay->subscribe_fiber);
13✔
1230
}
1,035✔
1231

1232
/** Check if a row should be sent to a remote replica. */
1233
static bool
1234
relay_filter_row(struct relay *relay, struct xrow_header *packet)
82,162✔
1235
{
1236
        assert(fiber()->f == relay_subscribe_f ||
82,162!
1237
               fiber()->f == relay_final_join_f);
1238
        bool is_subscribe = fiber()->f == relay_subscribe_f;
82,162!
1239
        /*
1240
         * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
1241
         * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
1242
         * we must relay all rows, even those originating from the replica
1243
         * itself (there may be such rows if this is rebootstrap). If this
1244
         * SUBSCRIBE, only send a row if it is not from the same replica
1245
         * (i.e. don't send replica's own rows back) or if this row is
1246
         * missing on the other side (i.e. in case of sudden power-loss,
1247
         * data was not written to WAL, so remote master can't recover
1248
         * it). In the latter case packet's LSN is less than or equal to
1249
         * local master's LSN at the moment it received 'SUBSCRIBE' request.
1250
         */
1251
        if ((1 << packet->replica_id & relay->id_filter) != 0) {
82,162✔
1252
                return false;
33,616✔
1253
        } else if (is_subscribe && packet->replica_id == relay->replica->id &&
48,561!
1254
                   packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
15✔
1255
                                            packet->replica_id)) {
1256
                /*
1257
                 * Knowing that recovery goes for LSNs in ascending order,
1258
                 * filter out this replica id to skip the expensive check above.
1259
                 * It'll always be true from now on for this relay.
1260
                 */
1261
                relay->id_filter |= 1 << packet->replica_id;
15✔
1262
                return false;
15✔
1263
        }
1264

1265
        if (packet->group_id == GROUP_LOCAL) {
48,531!
1266
                /*
1267
                 * All packets with REPLICA_ID_NIL are filtered out by
1268
                 * id_filter. The remaining ones are from old tarantool
1269
                 * versions, when local rows went by normal replica id. We have
1270
                 * to relay them as NOPs for the sake of vclock convergence.
1271
                 */
1272
                assert(packet->replica_id != REPLICA_ID_NIL);
×
1273
                packet->type = IPROTO_NOP;
×
1274
                packet->group_id = GROUP_DEFAULT;
×
1275
                packet->bodycnt = 0;
×
1276
        }
1277

1278
        /*
1279
         * This is not a filter, but still seems to be the best place for this
1280
         * code. PROMOTE/DEMOTE should be sent only after corresponding RAFT
1281
         * term was already sent. We assume that PROMOTE/DEMOTE will arrive
1282
         * after RAFT term, otherwise something might break.
1283
         */
1284
        if (iproto_type_is_promote_request(packet->type)) {
48,531✔
1285
                struct synchro_request req;
1286
                if (xrow_decode_synchro(packet, &req, NULL) != 0)
460!
1287
                        diag_raise();
×
1288
                while (relay->sent_raft_term < req.term) {
482✔
1289
                        if (fiber_is_cancelled()) {
139!
1290
                                diag_set(FiberIsCancelled);
×
1291
                                diag_raise();
×
1292
                        }
1293
                        cbus_process(&relay->tx_endpoint);
139!
1294
                        if (relay->sent_raft_term >= req.term)
139✔
1295
                                break;
117✔
1296
                        fiber_yield();
22✔
1297
                }
1298
        }
1299
        return true;
48,674✔
1300
}
1301

1302
/**
1303
 * A helper struct to collect all rows to be sent in scope of a transaction
1304
 * into a single list.
1305
 */
1306
struct relay_row {
1307
        /** A transaction row. */
1308
        struct xrow_header row;
1309
        /** A link in all transaction rows. */
1310
        struct rlist in_tx;
1311
};
1312

1313
/** Save a single transaction row for the future use. */
1314
static void
1315
relay_save_row(struct relay *relay, struct xrow_header *packet)
48,671✔
1316
{
1317
        struct relay_row *tx_row = xlsregion_alloc_object(&relay->lsregion,
48,671!
1318
                                                          ++relay->lsr_id,
1319
                                                          struct relay_row);
1320
        struct xrow_header *row = &tx_row->row;
48,661✔
1321
        *row = *packet;
48,661✔
1322
        if (packet->bodycnt == 1) {
48,661✔
1323
                size_t len = packet->body[0].iov_len;
48,568✔
1324
                void *new_body = xlsregion_alloc(&relay->lsregion, len,
48,568!
1325
                                                 ++relay->lsr_id);
1326
                memcpy(new_body, packet->body[0].iov_base, len);
48,551✔
1327
                row->body[0].iov_base = new_body;
48,551✔
1328
        }
1329
        rlist_add_tail_entry(&relay->current_tx, tx_row, in_tx);
48,644✔
1330
}
48,644✔
1331

1332
/** Send a full transaction to the replica. */
1333
static void
1334
relay_send_tx(struct relay *relay)
27,772✔
1335
{
1336
        struct relay_row *item;
1337

1338
        rlist_foreach_entry(item, &relay->current_tx, in_tx) {
152,834✔
1339
                struct xrow_header *packet = &item->row;
48,636✔
1340

1341
                struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
48,636!
1342
                                            ERRINJ_INT);
1343
                if (inj != NULL && packet->lsn == inj->iparam) {
48,636!
1344
                        packet->lsn = inj->iparam - 1;
×
1345
                        packet->tsn = packet->lsn;
×
1346
                        say_warn("injected broken lsn: %lld",
×
1347
                                 (long long) packet->lsn);
1348
                }
1349
                relay_send(relay, packet);
48,636✔
1350
        }
1351
        if (relay_check_flush(relay) < 0)
27,781!
UNCOV
1352
                diag_raise();
×
1353

1354
        rlist_create(&relay->current_tx);
27,785✔
1355
        lsregion_gc(&relay->lsregion, relay->lsr_id);
27,785✔
1356
}
27,770✔
1357

1358
static void
1359
relay_process_row(struct xstream *stream, struct xrow_header *packet)
82,139✔
1360
{
1361
        struct relay *relay = container_of(stream, struct relay, stream);
82,139✔
1362
        struct rlist *current_tx = &relay->current_tx;
82,139✔
1363

1364
        if (relay->read_tsn == 0) {
82,139✔
1365
                rlist_create(current_tx);
1366
                relay->read_tsn = packet->tsn;
41,657✔
1367
        } else if (relay->read_tsn != packet->tsn) {
40,482!
1368
                tnt_raise(ClientError, ER_PROTOCOL, "Found a new transaction "
×
1369
                          "with previous one not yet committed");
1370
        }
1371

1372
        if (!packet->is_commit) {
82,139✔
1373
                if (relay_filter_row(relay, packet)) {
40,514✔
1374
                        relay_save_row(relay, packet);
21,949✔
1375
                }
1376
                return;
40,509✔
1377
        }
1378
        if (relay_filter_row(relay, packet)) {
41,625✔
1379
                relay_save_row(relay, packet);
26,647✔
1380
        } else if (rlist_empty(current_tx)) {
15,038✔
1381
                relay->read_tsn = 0;
13,970✔
1382
                return;
13,970✔
1383
        } else {
1384
                rlist_last_entry(current_tx, struct relay_row,
1,068✔
1385
                                 in_tx)->row.flags = packet->flags;
1,068✔
1386
        }
1387
        relay_send_tx(relay);
27,772✔
1388
        relay->read_tsn = 0;
27,770✔
1389
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc