• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / tarantool / 12352793795

16 Dec 2024 12:37PM UTC coverage: 87.345% (-0.009%) from 87.354%
12352793795

push

github

Buristan
luajit: bump new version

* test: support number value of tag in LuaJIT-tests
* test: fix LuaJIT-tests for old libc version
* test: small fixes in gh-8594-sysprof-ffunc-crash
* test: skip sysprof tests with LUAJIT_DISABLE_SYSPROF
* test: skip <string/dump.lua> test for table bump
* test: fix build for macOS Sequoia 15.0
* test: fix Clang warning in LuaJIT-tests
* cmake: fix build for Alpine

NO_CHANGELOG=LuaJIT submodule bump
NO_DOC=LuaJIT submodule bump
NO_TEST=LuaJIT submodule bump

69661 of 123524 branches covered (56.39%)

102604 of 117470 relevant lines covered (87.34%)

2724254.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.15
/src/box/xlog.c
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "xlog.h"
32
#include <dirent.h>
33
#include <fcntl.h>
34
#include <ctype.h>
35

36
#include "fiber.h"
37
#include "exception.h"
38
#include "crc32.h"
39
#include "fio.h"
40
#include <tarantool_eio.h>
41
#include <msgpuck.h>
42

43
#include "coio_task.h"
44
#include "tt_static.h"
45
#include "error.h"
46
#include "xrow.h"
47
#include "iproto_constants.h"
48
#include "errinj.h"
49
#include "salad/grp_alloc.h"
50
#include "trivia/util.h"
51
#include "retention_period.h"
52

53
/*
54
 * FALLOC_FL_KEEP_SIZE flag has existed since fallocate() was
55
 * first introduced, but it was not defined in glibc headers
56
 * for a while. Define it manually if necessary.
57
 */
58
#ifdef HAVE_FALLOCATE
59
# ifndef FALLOC_FL_KEEP_SIZE
60
#  define FALLOC_FL_KEEP_SIZE 0x01
61
# endif
62
#endif /* HAVE_FALLOCATE */
63

64
/*
65
 * marker is MsgPack fixext2
66
 * +--------+--------+--------+--------+
67
 * |  0xd5  |  type  |       data      |
68
 * +--------+--------+--------+--------+
69
 */
70
typedef uint32_t log_magic_t;
71

72
static const log_magic_t row_marker = mp_bswap_u32(0xd5ba0bab); /* host byte order */
73
static const log_magic_t zrow_marker = mp_bswap_u32(0xd5ba0bba); /* host byte order */
74
static const log_magic_t eof_marker = mp_bswap_u32(0xd510aded); /* host byte order */
75

76
enum {
77
        /**
78
         * When the number of rows in xlog_tx write buffer
79
         * gets this big, don't delay flush any longer, and
80
         * issue a write.
81
         * This also acts as a default for slab size in the
82
         * slab cache so must be a power of 2.
83
         */
84
        XLOG_TX_AUTOCOMMIT_THRESHOLD = 128 * 1024,
85
        /**
86
         * Compress output buffer before dumping it to
87
         * disk if it is at least this big. On smaller
88
         * sizes compression takes up CPU but doesn't
89
         * yield seizable gains.
90
         * Maybe this should be a configuration option.
91
         */
92
        XLOG_TX_COMPRESS_THRESHOLD = 2 * 1024,
93
};
94

95
const struct xlog_opts xlog_opts_default = {
96
        .rate_limit = 0,
97
        .sync_interval = 0,
98
        .free_cache = false,
99
        .sync_is_async = false,
100
        .no_compression = false,
101
};
102

103
/* {{{ struct xlog_meta */
104

105
enum {
106
        /*
107
         * The maximum length of xlog meta
108
         *
109
         * @sa xlog_meta_parse()
110
         */
111
        XLOG_META_LEN_MAX = 1024 + VCLOCK_STR_LEN_MAX
112
};
113

114
#define INSTANCE_UUID_KEY "Instance"
115
#define INSTANCE_UUID_KEY_V12 "Server"
116
#define VCLOCK_KEY "VClock"
117
#define VERSION_KEY "Version"
118
#define PREV_VCLOCK_KEY "PrevVClock"
119

120
static const char v13[] = "0.13";
121
static const char v12[] = "0.12";
122

123
void
124
xlog_meta_create(struct xlog_meta *meta, const char *filetype,
22,060✔
125
                 const struct tt_uuid *instance_uuid,
126
                 const struct vclock *vclock,
127
                 const struct vclock *prev_vclock)
128
{
129
        snprintf(meta->filetype, sizeof(meta->filetype), "%s", filetype);
22,060✔
130
        meta->instance_uuid = *instance_uuid;
22,060✔
131
        if (vclock != NULL)
22,060✔
132
                vclock_copy(&meta->vclock, vclock);
16,437✔
133
        else
134
                vclock_clear(&meta->vclock);
5,623✔
135
        if (prev_vclock != NULL)
22,060✔
136
                vclock_copy(&meta->prev_vclock, prev_vclock);
6,936✔
137
        else
138
                vclock_clear(&meta->prev_vclock);
15,124✔
139
}
22,059✔
140

141
/**
142
 * Format xlog metadata into @a buf of size @a size
143
 *
144
 * @param buf buffer to use.
145
 * @param size the size of buffer. This function write at most @a size bytes.
146
 * @retval < size the number of characters printed (excluding the null byte)
147
 * @retval >=size the number of characters (excluding the null byte),
148
 *                which would have been written to the final string if
149
 *                enough space had been available.
150
 * @retval -1 error, check diag
151
 * @sa snprintf()
152
 */
153
static int
154
xlog_meta_format(const struct xlog_meta *meta, char *buf, int size)
22,051✔
155
{
156
        int total = 0;
22,051✔
157
        SNPRINT(total, snprintf, buf, size,
22,051!
158
                "%s\n"
159
                "%s\n"
160
                VERSION_KEY ": %s\n"
161
                INSTANCE_UUID_KEY ": %s\n",
162
                meta->filetype, v13, PACKAGE_VERSION,
163
                tt_uuid_str(&meta->instance_uuid));
164
        if (vclock_is_set(&meta->vclock)) {
22,052✔
165
                SNPRINT(total, snprintf, buf, size, VCLOCK_KEY ": %s\n",
16,429!
166
                        vclock_to_string(&meta->vclock));
167
        }
168
        if (vclock_is_set(&meta->prev_vclock)) {
22,052✔
169
                SNPRINT(total, snprintf, buf, size, PREV_VCLOCK_KEY ": %s\n",
6,928!
170
                        vclock_to_string(&meta->prev_vclock));
171
        }
172
        SNPRINT(total, snprintf, buf, size, "\n");
22,052!
173
        assert(total > 0);
22,052!
174
        return total;
22,052✔
175
}
176

177
/**
178
 * Parse vclock from xlog meta.
179
 */
180
static int
181
parse_vclock(const char *val, const char *val_end, struct vclock *vclock)
28,262✔
182
{
183
        if (val_end - val > VCLOCK_STR_LEN_MAX) {
28,262!
184
                diag_set(XlogError, "can't parse vclock");
×
185
                return -1;
28,263✔
186
        }
187
        char str[VCLOCK_STR_LEN_MAX + 1];
188
        memcpy(str, val, val_end - val);
28,262✔
189
        str[val_end - val] = '\0';
28,262✔
190
        size_t off = vclock_from_string(vclock, str);
28,262!
191
        ERROR_INJECT(ERRINJ_XLOG_META, { off = 1; });
28,263!
192
        if (off != 0) {
28,263!
193
                diag_set(XlogError, "invalid vclock at "
×
194
                         "offset %zd", off);
195
                return -1;
×
196
        }
197
        return 0;
28,263✔
198
}
199

200
static inline bool
201
xlog_meta_key_equal(const char *key, const char *key_end, const char *str)
227,140✔
202
{
203
        size_t key_len = key_end - key;
227,140✔
204
        return key_len == strlen(str) && memcmp(key, str, key_len) == 0;
227,140✔
205
}
206

207
/**
208
 * Parse xlog meta from buffer, update buffer read
209
 * position in case of success
210
 *
211
 * @retval 0 for success
212
 * @retval -1 for parse error
213
 * @retval 1 if buffer hasn't enough data
214
 */
215
static ssize_t
216
xlog_meta_parse(struct xlog_meta *meta, const char **data,
22,763✔
217
                const char *data_end)
218
{
219
        memset(meta, 0, sizeof(*meta));
22,763✔
220
        const char *end = (const char *)memmem(*data, data_end - *data,
22,763✔
221
                                               "\n\n", 2);
222
        if (end == NULL)
22,763✔
223
                return 1;
22,763✔
224
        ++end; /* include the trailing \n to simplify the checks */
22,750✔
225
        const char *pos = (const char *)*data;
22,750✔
226

227
        /*
228
         * Parse filetype, i.e "SNAP" or "XLOG"
229
         */
230
        const char *eol = (const char *)memchr(pos, '\n', end - pos);
22,750✔
231
        if (eol == end || (eol - pos) >= (ptrdiff_t) sizeof(meta->filetype)) {
22,750!
232
                diag_set(XlogError, "failed to parse xlog type string");
×
233
                return -1;
×
234
        }
235
        memcpy(meta->filetype, pos, eol - pos);
22,750✔
236
        meta->filetype[eol - pos] = '\0';
22,750✔
237
        pos = eol + 1;
22,750✔
238
        assert(pos <= end);
22,750!
239

240
        /*
241
         * Parse version string, i.e. "0.12" or "0.13"
242
         */
243
        char version[10];
244
        eol = (const char *)memchr(pos, '\n', end - pos);
22,750✔
245
        if (eol == end || (eol - pos) >= (ptrdiff_t) sizeof(version)) {
22,750!
246
                diag_set(XlogError, "failed to parse xlog version string");
×
247
                return -1;
×
248
        }
249
        memcpy(version, pos, eol - pos);
22,750✔
250
        version[eol - pos] = '\0';
22,750✔
251
        pos = eol + 1;
22,750✔
252
        assert(pos <= end);
22,750!
253
        if (strncmp(version, v12, sizeof(v12)) != 0 &&
22,750✔
254
            strncmp(version, v13, sizeof(v13)) != 0) {
22,728✔
255
                diag_set(XlogError,
1!
256
                          "unsupported file format version %s",
257
                          version);
258
                return -1;
1✔
259
        }
260

261
        vclock_clear(&meta->vclock);
22,749!
262
        vclock_clear(&meta->prev_vclock);
22,749!
263

264
        /*
265
         * Parse "key: value" pairs
266
         */
267
        while (pos < end) {
96,483✔
268
                eol = (const char *)memchr(pos, '\n', end - pos);
73,734✔
269
                assert(eol <= end);
73,734!
270
                const char *key = pos;
73,734✔
271
                const char *key_end = (const char *)
272
                        memchr(key, ':', eol - key);
73,734✔
273
                if (key_end == NULL) {
73,734!
274
                        diag_set(XlogError, "can't extract meta value");
×
275
                        return -1;
×
276
                }
277
                const char *val = key_end + 1;
73,734✔
278
                /* Skip space after colon */
279
                while (*val == ' ' || *val == '\t')
147,466!
280
                        ++val;
73,732✔
281
                const char *val_end = eol;
73,734✔
282
                assert(val <= val_end);
73,734!
283
                pos = eol + 1;
73,734✔
284

285
                if (xlog_meta_key_equal(key, key_end, INSTANCE_UUID_KEY) ||
73,734!
286
                    xlog_meta_key_equal(key, key_end, INSTANCE_UUID_KEY_V12)) {
73,758!
287
                        /*
288
                         * Instance: <uuid>
289
                         */
290
                        if (val_end - val != UUID_STR_LEN) {
22,748!
291
                                diag_set(XlogError, "can't parse instance UUID");
×
292
                                return -1;
×
293
                        }
294
                        char uuid[UUID_STR_LEN + 1];
295
                        memcpy(uuid, val, UUID_STR_LEN);
22,748✔
296
                        uuid[UUID_STR_LEN] = '\0';
22,748✔
297
                        if (tt_uuid_from_string(uuid, &meta->instance_uuid) != 0) {
22,748!
298
                                diag_set(XlogError, "can't parse instance UUID");
×
299
                                return -1;
×
300
                        }
301
                } else if (xlog_meta_key_equal(key, key_end, VCLOCK_KEY)) {
50,982!
302
                        /*
303
                         * VClock: <vclock>
304
                         */
305
                        if (parse_vclock(val, val_end, &meta->vclock) != 0)
22,284!
306
                                return -1;
×
307
                } else if (xlog_meta_key_equal(key, key_end, PREV_VCLOCK_KEY)) {
28,698!
308
                        /*
309
                         * PrevVClock: <vclock>
310
                         */
311
                        if (parse_vclock(val, val_end, &meta->prev_vclock) != 0)
5,978!
312
                                return -1;
×
313
                } else if (xlog_meta_key_equal(key, key_end, VERSION_KEY)) {
22,721!
314
                        /* Ignore Version: for now */
315
                } else {
316
                        /*
317
                         * Unknown key
318
                         */
319
                        say_warn("Unknown meta item: '%.*s'",
×
320
                                 (int)(key_end - key), key);
321
                }
322
        }
323
        *data = end + 1; /* skip the last trailing \n of \n\n sequence */
22,749✔
324
        return 0;
22,749✔
325
}
326

327
/* struct xlog }}} */
328

329
/* {{{ struct xdir */
330

331
void
332
xdir_create(struct xdir *dir, const char *dirname, enum xdir_type type,
22,037✔
333
            const struct tt_uuid *instance_uuid, const struct xlog_opts *opts)
334
{
335
        memset(dir, 0, sizeof(*dir));
22,037✔
336
        dir->opts = *opts;
22,037✔
337
        vclockset_new(&dir->index);
22,037✔
338
        /* Default mode. */
339
        dir->mode = 0660;
22,037✔
340
        dir->instance_uuid = instance_uuid;
22,037✔
341
        snprintf(dir->dirname, sizeof(dir->dirname), "%s", dirname);
22,037✔
342
        dir->open_wflags = 0;
22,037✔
343
        switch (type) {
22,037!
344
        case SNAP:
9,401✔
345
                dir->filetype = "SNAP";
9,401✔
346
                dir->filename_ext = ".snap";
9,401✔
347
                dir->suffix = INPROGRESS;
9,401✔
348
                break;
9,401✔
349
        case XLOG:
8,163✔
350
                dir->filetype = "XLOG";
8,163✔
351
                dir->filename_ext = ".xlog";
8,163✔
352
                dir->suffix = NONE;
8,163✔
353
                dir->force_recovery = true;
8,163✔
354
                break;
8,163✔
355
        case VYLOG:
4,473✔
356
                dir->filetype = "VYLOG";
4,473✔
357
                dir->filename_ext = ".vylog";
4,473✔
358
                dir->suffix = INPROGRESS;
4,473✔
359
                break;
4,473✔
360
        default:
×
361
                unreachable();
×
362
        }
363
        dir->type = type;
22,037✔
364
        dir->retention_period = 0;
22,037✔
365
}
22,037✔
366

367
/**
368
 * Delete all members from the set of vector clocks.
369
 */
370
static void
371
vclockset_reset(vclockset_t *set)
21,624✔
372
{
373
        struct vclock *vclock = vclockset_first(set);
21,624✔
374
        while (vclock != NULL) {
44,651✔
375
                struct vclock *next = vclockset_next(set, vclock);
23,029✔
376
                vclockset_remove(set, vclock);
23,029✔
377
                free(vclock);
23,028✔
378
                vclock = next;
23,028✔
379
        }
380
}
21,622✔
381

382
/**
383
 * Destroy xdir object and free memory.
384
 */
385
void
386
xdir_destroy(struct xdir *dir)
21,624✔
387
{
388
        /** Free vclock objects allocated in xdir_scan(). */
389
        vclockset_reset(&dir->index);
21,624✔
390
}
21,622✔
391

392
/**
393
 * Add a single log file to the index of all log files
394
 * in a given log directory.
395
 */
396
static inline int
397
xdir_index_file(struct xdir *dir, int64_t signature)
8,611✔
398
{
399
        /*
400
         * Open xlog and parse vclock in its text header.
401
         * The vclock stores the state of the log at the
402
         * time it is created.
403
         */
404
        struct xlog_cursor cursor;
405
        if (xdir_open_cursor(dir, signature, &cursor) < 0)
8,611!
406
                return -1;
8,611✔
407
        struct xlog_meta *meta = &cursor.meta;
8,597✔
408

409
        /*
410
         * All log files in a directory must satisfy Lamport's
411
         * eventual order: events in each log file must be
412
         * separable with consistent cuts, for example:
413
         *
414
         * log1: {1, 1, 0, 1}, log2: {1, 2, 0, 2} -- good
415
         * log2: {1, 1, 0, 1}, log2: {2, 0, 2, 0} -- bad
416
         */
417
        struct vclock *dup = vclockset_search(&dir->index, &meta->vclock);
8,597!
418
        if (dup != NULL) {
8,597!
419
                diag_set(XlogError, "%s: invalid xlog order", cursor.name);
×
420
                xlog_cursor_close(&cursor, false);
×
421
                return -1;
×
422
        }
423

424
        /*
425
         * Append the clock describing the file to the directory index.
426
         * If retention feature is available (EE), then additional memory is
427
         * allocated. Time, when xlog protection should stop is saved there.
428
         */
429
        struct vclock *vclock = retention_vclock_new();
8,597!
430
        vclock_copy(vclock, &meta->vclock);
8,597!
431
        xlog_cursor_close(&cursor, false);
8,597!
432
        vclockset_insert(&dir->index, vclock);
8,597!
433
        return 0;
8,597✔
434
}
435

436
int
437
xdir_open_cursor(struct xdir *dir, int64_t signature,
16,162✔
438
                 struct xlog_cursor *cursor)
439
{
440
        const char *filename = xdir_format_filename(dir, signature, NONE);
16,162✔
441
        int fd = open(filename, O_RDONLY);
16,163✔
442
        if (fd < 0) {
16,163!
443
                diag_set(SystemError, "failed to open '%s' file", filename);
×
444
                return -1;
×
445
        }
446
        if (xlog_cursor_openfd(cursor, fd, filename) < 0) {
16,163✔
447
                close(fd);
12✔
448
                return -1;
12✔
449
        }
450
        struct xlog_meta *meta = &cursor->meta;
16,151✔
451
        if (strcmp(meta->filetype, dir->filetype) != 0) {
16,151!
452
                xlog_cursor_close(cursor, false);
×
453
                diag_set(ClientError, ER_INVALID_XLOG_TYPE,
×
454
                         dir->filetype, meta->filetype);
455
                return -1;
×
456
        }
457
        if (!tt_uuid_is_nil(dir->instance_uuid) &&
16,151✔
458
            !tt_uuid_is_equal(dir->instance_uuid, &meta->instance_uuid)) {
14,196✔
459
                xlog_cursor_close(cursor, false);
2✔
460
                diag_set(XlogError, "%s: invalid instance UUID", filename);
2!
461
                return -1;
2✔
462
        }
463
        /*
464
         * Check the match between log file name and contents:
465
         * the sum of vector clock coordinates must be the same
466
         * as the name of the file.
467
         */
468
        int64_t signature_check = vclock_sum(&meta->vclock);
16,149✔
469
        if (signature_check != signature) {
16,149!
470
                xlog_cursor_close(cursor, false);
×
471
                diag_set(XlogError, "%s: signature check failed", filename);
×
472
                return -1;
×
473
        }
474
        return 0;
16,149✔
475
}
476

477
/**
478
 * Scan (or rescan) a directory with snapshot or write ahead logs.
479
 * Read all files matching a pattern from the directory -
480
 * the filename pattern is \d+.xlog
481
 * The name of the file is based on its vclock signature,
482
 * which is the sum of all elements in the vector clock recorded
483
 * when the file was created. Elements in the vector
484
 * reflect log sequence numbers of replicas in the asynchronous
485
 * replication set (see also _cluster system space and vclock.h
486
 * comments).
487
 *
488
 * @param dir - directory to scan
489
 * @param is_dir_required - flag set if the directory should exist
490
 *
491
 * @return:
492
 *   0 - on success or flag 'is_dir_required' was set to False in
493
 *   xdir_scan() arguments and opendir() failed with errno ENOENT;
494
 *   -1 - if opendir() failed, with other than ENOENT errno either
495
 *   when 'is_dir_required' was set to True in xdir_scan() arguments.
496
 *
497
 * This function tries to avoid re-reading a file if
498
 * it is already in the set of files "known" to the log
499
 * dir object. This is done to speed up local hot standby and
500
 * recovery_follow_local(), which periodically rescan the
501
 * directory to discover newly created logs.
502
 *
503
 * On error, this function throws an exception. If
504
 * dir->force_recovery is true, *some* errors are not
505
 * propagated up but only logged in the error log file.
506
 *
507
 * The list of errors ignored in force_recovery = true mode
508
 * includes:
509
 * - a file can not be opened
510
 * - some of the files have incorrect metadata (such files are
511
 *   skipped)
512
 *
513
 * The goal of force_recovery = true mode is partial recovery
514
 * from a damaged/incorrect data directory. It doesn't
515
 * silence conditions such as out of memory or lack of OS
516
 * resources.
517
 *
518
 */
519
int
520
xdir_scan(struct xdir *dir, bool is_dir_required)
17,106✔
521
{
522
        DIR *dh = opendir(dir->dirname);        /* log dir */
17,106✔
523
        int64_t *signatures = NULL;             /* log file names */
17,106✔
524
        size_t s_count = 0, s_capacity = 0;
17,106✔
525

526
        if (dh == NULL) {
17,106✔
527
                if (!is_dir_required && errno == ENOENT)
4!
528
                        return 0;
3✔
529
                diag_set(SystemError, "error reading directory '%s'",
1!
530
                          dir->dirname);
531
                return -1;
1✔
532
        }
533

534
        int rc = -1;
17,102✔
535
        struct vclock *vclock;
536
        struct dirent *dent;
537
        /*
538
          A note regarding thread safety, readdir vs. readdir_r:
539

540
          POSIX explicitly makes the following guarantee: "The
541
          pointer returned by readdir() points to data which may
542
          be overwritten by another call to readdir() on the same
543
          directory stream. This data is not overwritten by another
544
          call to readdir() on a different directory stream.
545

546
          In practice, you don't have a problem with readdir(3)
547
          because Android's bionic, Linux's glibc, and OS X and iOS'
548
          libc all allocate per-DIR* buffers, and return pointers
549
          into those; in Android's case, that buffer is currently about
550
          8KiB. If future file systems mean that this becomes an actual
551
          limitation, we can fix the C library and all your applications
552
          will keep working.
553

554
          See also
555
          http://elliotth.blogspot.co.uk/2012/10/how-not-to-use-readdirr3.html
556
        */
557
        while ((dent = readdir(dh)) != NULL) {
151,501✔
558
                char *ext = strchr(dent->d_name, '.');
134,399✔
559
                if (ext == NULL)
134,399✔
560
                        continue;
125,710✔
561
                /*
562
                 * Compare the rest of the filename with
563
                 * dir->filename_ext.
564
                 */
565
                if (strcmp(ext, dir->filename_ext) != 0)
133,615✔
566
                        continue;
124,924✔
567

568
                char *dot;
569
                long long signature = strtoll(dent->d_name, &dot, 10);
8,691✔
570
                if (ext != dot ||
8,691!
571
                    signature == LLONG_MAX || signature == LLONG_MIN) {
8,689!
572
                        say_warn("can't parse `%s', skipping", dent->d_name);
2!
573
                        continue;
2✔
574
                }
575

576
                if (s_count == s_capacity) {
8,689✔
577
                        s_capacity = s_capacity > 0 ? 2 * s_capacity : 16;
5,221✔
578
                        size_t size = sizeof(*signatures) * s_capacity;
5,221✔
579
                        signatures = (int64_t *) realloc(signatures, size);
5,221✔
580
                        if (signatures == NULL) {
5,221!
581
                                diag_set(OutOfMemory,
×
582
                                          size, "realloc", "signatures array");
583
                                goto exit;
×
584
                        }
585
                }
586
                signatures[s_count++] = signature;
8,689✔
587
        }
588
        /** Sort the list of files */
589
        if (s_count > 1)
17,102✔
590
                qsort(signatures, s_count, sizeof(*signatures), cmp_i64);
1,962✔
591
        /**
592
         * Update the log dir index with the current state:
593
         * remove files which no longer exist, add files which
594
         * appeared since the last scan.
595
         */
596
        vclock = vclockset_first(&dir->index);
17,102✔
597
        for (unsigned i = 0; i < s_count || vclock != NULL;) {
25,839!
598
                int64_t s_old = vclock ? vclock_sum(vclock) : LLONG_MAX;
8,739✔
599
                int64_t s_new = i < s_count ? signatures[i] : LLONG_MAX;
8,739!
600
                if (s_old < s_new) {
8,739✔
601
                        /** Remove a deleted file from the index */
602
                        struct vclock *next =
603
                                vclockset_next(&dir->index, vclock);
50✔
604
                        vclockset_remove(&dir->index, vclock);
50✔
605
                        free(vclock);
50✔
606
                        vclock = next;
50✔
607
                } else if (s_old > s_new) {
8,689✔
608
                        /** Add a new file. */
609
                        if (xdir_index_file(dir, s_new) != 0) {
8,611✔
610
                                /*
611
                                 * force_recovery must not affect OOM
612
                                 */
613
                                struct error *e = diag_last_error(&fiber()->diag);
14!
614
                                if (!dir->force_recovery ||
14✔
615
                                    type_assignable(&type_OutOfMemory, e->type))
12!
616
                                        goto exit;
2✔
617
                                /** Skip a corrupted file */
618
                                error_log(e);
12✔
619
                        }
620
                        i++;
8,609✔
621
                } else {
622
                        assert(s_old == s_new && i < s_count &&
78!
623
                               vclock != NULL);
624
                        vclock = vclockset_next(&dir->index, vclock);
78✔
625
                        i++;
78✔
626
                }
627
        }
628
        /* Initialize expiration time of all files, saved inside index. */
629
        retention_index_update(dir, 0);
17,100✔
630
        rc = 0;
17,100✔
631

632
exit:
17,102✔
633
        closedir(dh);
17,102✔
634
        free(signatures);
17,102✔
635
        return rc;
17,102✔
636
}
637

638
int
639
xdir_check(struct xdir *dir)
3,690✔
640
{
641
        DIR *dh = opendir(dir->dirname);        /* log dir */
3,690✔
642
        if (dh == NULL) {
3,690!
643
                diag_set(SystemError, "error reading directory '%s'",
×
644
                          dir->dirname);
645
                return -1;
×
646
        }
647
        closedir(dh);
3,690✔
648
        return 0;
3,690✔
649
}
650

651
void
652
xdir_set_retention_period(struct xdir *xdir, double period)
4,472✔
653
{
654
        double old_period = xdir->retention_period;
4,472✔
655
        xdir->retention_period = period;
4,472✔
656
        retention_index_update(xdir, old_period);
4,472✔
657
}
4,472✔
658

659
void
660
xdir_get_retention_vclock(struct xdir *xdir, struct vclock *vclock)
3,943,630✔
661
{
662
        retention_index_get(&xdir->index, vclock);
3,943,630✔
663
}
3,943,630✔
664

665
void
666
xdir_set_retention_vclock(struct xdir *xdir, struct vclock *vclock)
3,705✔
667
{
668
        if (xdir->retention_period == 0)
3,705!
669
                return;
3,705✔
670

671
        struct vclock *find = vclockset_match(&xdir->index, vclock);
×
672
        assert(vclock_compare(find, vclock) == 0);
×
673
        retention_vclock_set(find, xdir->retention_period);
×
674
}
675

676
const char *
677
xdir_format_filename(struct xdir *dir, int64_t signature,
46,252✔
678
                enum log_suffix suffix)
679
{
680
        return tt_snprintf(PATH_MAX, "%s/%020lld%s%s",
92,505!
681
                           dir->dirname, (long long) signature,
46,252✔
682
                           dir->filename_ext, suffix == INPROGRESS ?
683
                                              inprogress_suffix : "");
684
}
685

686
void
687
xdir_collect_garbage(struct xdir *dir, int64_t signature, unsigned flags)
2,951✔
688
{
689
        unsigned rm_flags = XLOG_RM_VERBOSE;
2,951✔
690
        if (flags & XDIR_GC_ASYNC)
2,951✔
691
                rm_flags |= XLOG_RM_ASYNC;
2,943✔
692

693
        struct vclock retention_vclock;
694
        retention_index_get(&dir->index, &retention_vclock);
2,951!
695
        if (vclock_is_set(&retention_vclock) &&
2,951!
696
            signature > vclock_sum(&retention_vclock)) {
×
697
                /*
698
                 * This should not normally happen and can be a reason of
699
                 * xdir_retention misusage: retention_vclock must be always
700
                 * checked before invoking garbage collection. As we cannot
701
                 * return error code from this function (gc already updated
702
                 * its vclock), the best we can do is warning.
703
                 */
704
                assert(false);
×
705
                say_warn("Requested xdir gc vclock is greater than "
706
                         "retention vclock. Using retention vclock instead");
707
                signature = vclock_sum(&retention_vclock);
708
        }
709

710
        struct vclock *vclock;
711
        while ((vclock = vclockset_first(&dir->index)) != NULL &&
5,440!
712
               vclock_sum(vclock) < signature) {
5,337!
713
                const char *filename =
714
                        xdir_format_filename(dir, vclock_sum(vclock), NONE);
2,497!
715
                xlog_remove_file(filename, rm_flags);
2,497!
716
                vclockset_remove(&dir->index, vclock);
2,497!
717
                free(vclock);
2,497✔
718
                if (flags & XDIR_GC_REMOVE_ONE)
2,497✔
719
                        break;
8✔
720
        }
721
}
2,951✔
722

723
int
724
xdir_remove_file_by_vclock(struct xdir *dir, struct vclock *to_remove)
1✔
725
{
726
        struct vclock *find = vclockset_match(&dir->index, to_remove);
1✔
727
        if (vclock_compare(find, to_remove) != 0)
1!
728
                return -1;
×
729
        const char *filename =
730
                xdir_format_filename(dir, vclock_sum(find), NONE);
1✔
731
        if (!xlog_remove_file(filename, XLOG_RM_VERBOSE))
1!
732
                return -1;
×
733
        vclockset_remove(&dir->index, find);
1✔
734
        free(find);
1✔
735
        return 0;
1✔
736
}
737

738
/** Default implementation of xlog_file_is_temporary(). */
739
static bool
740
xlog_file_is_temporary_default(const char *filename)
12,873✔
741
{
742
        char *ext = strrchr(filename, '.');
12,873✔
743
        return ext != NULL && strcmp(ext, inprogress_suffix) == 0;
12,873✔
744
}
745

746
xlog_file_is_temporary_f xlog_file_is_temporary =
747
                                xlog_file_is_temporary_default;
748

749
void
750
xdir_remove_temporary_files(struct xdir *xdir)
2,349✔
751
{
752
        const char *dirname = xdir->dirname;
2,349✔
753
        DIR *dh = opendir(dirname);
2,349✔
754
        if (dh == NULL) {
2,349✔
755
                if (errno != ENOENT)
1!
756
                        say_syserror("error reading directory '%s'", dirname);
×
757
                return;
1✔
758
        }
759
        struct dirent *dent;
760
        while ((dent = readdir(dh)) != NULL) {
15,221✔
761
                const char *filename = tt_snprintf(PATH_MAX, "%s/%s",
25,746✔
762
                                                   dirname, dent->d_name);
12,873✔
763
                if (xlog_file_is_temporary(filename))
12,873✔
764
                        xlog_remove_file(filename, XLOG_RM_VERBOSE);
2✔
765
        }
766
        closedir(dh);
2,348✔
767
}
768

769
void
770
xdir_add_vclock(struct xdir *xdir, const struct vclock *vclock)
17,331✔
771
{
772
        struct vclock *copy = retention_vclock_new();
17,331✔
773
        vclock_copy(copy, vclock);
17,331✔
774
        vclockset_insert(&xdir->index, copy);
17,331✔
775
}
17,331✔
776

777
/* }}} */
778

779

780
/* {{{ struct xlog */
781

782
int
783
xlog_materialize(struct xlog *l)
22,016✔
784
{
785
        char *filename = l->filename;
22,016✔
786
        char new_filename[PATH_MAX];
787
        char *suffix = strrchr(filename, '.');
22,016✔
788

789
        assert(l->is_inprogress);
22,016!
790
        assert(suffix);
22,016!
791
        assert(strcmp(suffix, inprogress_suffix) == 0);
22,016!
792

793
        /* Create a new filename without '.inprogress' suffix. */
794
        memcpy(new_filename, filename, suffix - filename);
22,016✔
795
        new_filename[suffix - filename] = '\0';
22,016✔
796

797
        ERROR_INJECT_SLEEP(ERRINJ_XLOG_RENAME_DELAY);
22,026!
798

799
        if (rename(filename, new_filename) != 0) {
22,016!
800
                say_syserror("can't rename %s to %s", filename, new_filename);
×
801
                diag_set(SystemError, "failed to rename '%s' file",
×
802
                                filename);
803
                return -1;
22,016✔
804
        }
805
        l->is_inprogress = false;
22,016✔
806
        filename[suffix - filename] = '\0';
22,016✔
807
        return 0;
22,016✔
808
}
809

810
static int
811
xlog_init(struct xlog *xlog, const struct xlog_opts *opts)
23,501✔
812
{
813
        memset(xlog, 0, sizeof(*xlog));
23,501✔
814
        xlog->opts = *opts;
23,501✔
815
        xlog->sync_time = ev_monotonic_time();
23,501✔
816
        xlog->is_autocommit = true;
23,501✔
817
        obuf_create(&xlog->obuf, &cord()->slabc, XLOG_TX_AUTOCOMMIT_THRESHOLD);
23,501✔
818
        obuf_create(&xlog->zbuf, &cord()->slabc, XLOG_TX_AUTOCOMMIT_THRESHOLD);
23,501!
819
        if (!opts->no_compression) {
23,501✔
820
                xlog->zctx = ZSTD_createCCtx();
21,381✔
821
                if (xlog->zctx == NULL) {
21,381!
822
                        diag_set(ClientError, ER_COMPRESSION,
×
823
                                 "failed to create context");
824
                        return -1;
×
825
                }
826
        }
827
        return 0;
23,501✔
828
}
829

830
void
831
xlog_clear(struct xlog *l)
18,624✔
832
{
833
        memset(l, 0, sizeof(*l));
18,624✔
834
        l->fd = -1;
18,624✔
835
}
18,624✔
836

837
/**
838
 * Frees the write context allocated in xlog_init().
839
 * The xlog file must be closed.
840
 */
841
static void
842
xlog_free(struct xlog *xlog)
23,433✔
843
{
844
        assert(xlog->fd < 0);
23,433!
845
        assert(xlog->obuf.slabc == &cord()->slabc);
23,433!
846
        assert(xlog->zbuf.slabc == &cord()->slabc);
23,433!
847
        obuf_destroy(&xlog->obuf);
23,433✔
848
        obuf_destroy(&xlog->zbuf);
23,424✔
849
        ZSTD_freeCCtx(xlog->zctx);
23,430✔
850
        xlog->zctx = NULL;
23,434✔
851
}
23,434✔
852

853
int
854
xlog_create(struct xlog *xlog, const char *name, int flags,
22,060✔
855
            const struct xlog_meta *meta, const struct xlog_opts *opts)
856
{
857
        char meta_buf[XLOG_META_LEN_MAX];
858
        int meta_len;
859

860
        /*
861
         * Check whether a file with this name already exists.
862
         * We don't overwrite existing files.
863
         */
864
        if (access(name, F_OK) == 0) {
22,060✔
865
                errno = EEXIST;
1✔
866
                diag_set(SystemError, "file '%s' already exists", name);
1!
867
                goto err;
1✔
868
        }
869

870
        if (xlog_init(xlog, opts) != 0)
22,059!
871
                goto err;
×
872

873
        xlog->meta = *meta;
22,059✔
874
        xlog->is_inprogress = true;
22,059✔
875
        snprintf(xlog->filename, sizeof(xlog->filename), "%s%s", name, inprogress_suffix);
22,059✔
876

877
        /* Make directory if needed (gh-5090). */
878
        if (mkdirpath(xlog->filename) != 0) {
22,059!
879
                diag_set(SystemError, "failed to create path '%s'",
7!
880
                         xlog->filename);
881
                goto err;
7✔
882
        }
883

884
        flags |= O_RDWR | O_CREAT | O_EXCL | O_CLOEXEC;
22,052✔
885

886
        /*
887
         * Open the <lsn>.<suffix>.inprogress file.
888
         * If it exists, open will fail. Always open/create
889
         * a file with .inprogress suffix: for snapshots,
890
         * the rename is done when the snapshot is complete.
891
         * For xlogs, we can rename only when we have written
892
         * the log file header, otherwise replication relay
893
         * may think that this is a corrupt file and stop
894
         * replication.
895
         */
896
        xlog->fd = open(xlog->filename, flags, 0644);
22,052!
897
        if (xlog->fd < 0) {
22,052!
898
                diag_set(SystemError, "failed to create file '%s'",
×
899
                         xlog->filename);
900
                goto err_open;
×
901
        }
902

903
        /* Format metadata */
904
        meta_len = xlog_meta_format(&xlog->meta, meta_buf, sizeof(meta_buf));
22,052!
905
        if (meta_len < 0)
22,052!
906
                goto err_write;
×
907
        /* Formatted metadata must fit into meta_buf */
908
        assert(meta_len < (int)sizeof(meta_buf));
22,052!
909

910
        /* Write metadata */
911
        if (fio_writen(xlog->fd, meta_buf, meta_len) < 0) {
22,052!
912
                diag_set(SystemError, "%s: failed to write xlog meta",
×
913
                         xlog->filename);
914
                goto err_write;
×
915
        }
916

917
        xlog->offset = meta_len; /* first log starts after meta */
22,052✔
918
        return 0;
22,060✔
919
err_write:
×
920
        close(xlog->fd);
×
921
        xlog->fd = -1;
×
922
        xlog_remove_file(xlog->filename, 0); /* try to remove incomplete file */
×
923
err_open:
×
924
        xlog_free(xlog);
×
925
err:
8✔
926
        return -1;
8✔
927
}
928

929
int
930
xlog_open(struct xlog *xlog, const char *name, const struct xlog_opts *opts)
1,442✔
931
{
932
        char magic[sizeof(log_magic_t)];
933
        char meta_buf[XLOG_META_LEN_MAX];
934
        const char *meta = meta_buf;
1,442✔
935
        int meta_len;
936
        int rc;
937

938
        if (xlog_init(xlog, opts) != 0)
1,442!
939
                goto err;
×
940

941
        strlcpy(xlog->filename, name, sizeof(xlog->filename));
1,442!
942

943
        xlog->fd = open(xlog->filename, O_RDWR | O_CLOEXEC);
1,442!
944
        if (xlog->fd < 0) {
1,442!
945
                say_syserror("open, [%s]", name);
×
946
                diag_set(SystemError, "failed to open file '%s'", name);
×
947
                goto err_open;
×
948
        }
949

950
        meta_len = fio_read(xlog->fd, meta_buf, sizeof(meta_buf));
1,442!
951
        if (meta_len < 0) {
1,442!
952
                diag_set(SystemError, "failed to read file '%s'",
×
953
                         xlog->filename);
954
                goto err_read;
×
955
        }
956

957
        rc = xlog_meta_parse(&xlog->meta, &meta, meta + meta_len);
1,442!
958
        if (rc < 0)
1,442!
959
                goto err_read;
×
960
        if (rc > 0) {
1,442✔
961
                diag_set(XlogError, "Unexpected end of file");
1!
962
                goto err_read;
1✔
963
        }
964

965
        /* Check if the file has EOF marker. */
966
        xlog->offset = fio_lseek(xlog->fd, -(off_t)sizeof(magic), SEEK_END);
1,441!
967
        if (xlog->offset < 0)
1,441!
968
                goto no_eof;
×
969
        /* Use pread() so as not to change file pointer. */
970
        rc = fio_pread(xlog->fd, magic, sizeof(magic), xlog->offset);
1,441!
971
        if (rc < 0) {
1,441!
972
                diag_set(SystemError, "failed to read file '%s'",
×
973
                         xlog->filename);
974
                goto err_read;
×
975
        }
976
        if (rc != sizeof(magic) || load_u32(magic) != eof_marker) {
1,441!
977
no_eof:
32✔
978
                xlog->offset = fio_lseek(xlog->fd, 0, SEEK_END);
32!
979
                if (xlog->offset < 0) {
32!
980
                        diag_set(SystemError, "failed to seek file '%s'",
×
981
                                 xlog->filename);
982
                        goto err_read;
×
983
                }
984
        } else {
985
                /* Truncate the file to erase the EOF marker. */
986
                if (ftruncate(xlog->fd, xlog->offset) != 0) {
1,409!
987
                        diag_set(SystemError, "failed to truncate file '%s'",
×
988
                                 xlog->filename);
989
                        goto err_read;
×
990
                }
991
        }
992
        return 0;
1,442✔
993
err_read:
1✔
994
        close(xlog->fd);
1!
995
        xlog->fd = -1;
1✔
996
err_open:
1✔
997
        xlog_free(xlog);
1!
998
err:
1✔
999
        return -1;
1✔
1000
}
1001

1002
int
1003
xdir_touch_xlog(struct xdir *dir, const struct vclock *vclock)
199✔
1004
{
1005
        int64_t signature = vclock_sum(vclock);
199✔
1006
        const char *filename = xdir_format_filename(dir, signature, NONE);
199✔
1007

1008
        if (dir->type != SNAP) {
199!
1009
                assert(false);
×
1010
                diag_set(SystemError, "Can't touch xlog '%s'", filename);
1011
                return -1;
1012
        }
1013
        if (utime(filename, NULL) != 0) {
199✔
1014
                diag_set(SystemError, "Can't update xlog timestamp: '%s'",
1!
1015
                         filename);
1016
                return -1;
1✔
1017
        }
1018
        return 0;
198✔
1019
}
1020

1021
/**
1022
 * In case of error, writes a message to the error log
1023
 * and sets errno.
1024
 */
1025
int
1026
xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
16,437✔
1027
                 const struct vclock *vclock)
1028
{
1029
        int64_t signature = vclock_sum(vclock);
16,437!
1030
        assert(signature >= 0);
16,437!
1031
        assert(!tt_uuid_is_nil(dir->instance_uuid));
16,437!
1032

1033
        /*
1034
         * For WAL dir: store vclock of the previous xlog file
1035
         * to check for gaps on recovery.
1036
         */
1037
        const struct vclock *prev_vclock = NULL;
16,437✔
1038
        if (dir->type == XLOG && !vclockset_empty(&dir->index))
16,437!
1039
                prev_vclock = vclockset_last(&dir->index);
6,936!
1040

1041
        struct xlog_meta meta;
1042
        xlog_meta_create(&meta, dir->filetype, dir->instance_uuid,
16,437!
1043
                         vclock, prev_vclock);
1044

1045
        const char *filename = xdir_format_filename(dir, signature, NONE);
16,437!
1046
        if (xlog_create(xlog, filename, dir->open_wflags, &meta,
16,437✔
1047
                        &dir->opts) != 0)
16,437!
1048
                return -1;
16,437✔
1049

1050
        /* Rename xlog file */
1051
        if (dir->suffix != INPROGRESS && xlog_materialize(xlog) != 0) {
16,429!
1052
                xlog_discard(xlog);
×
1053
                return -1;
×
1054
        }
1055

1056
        return 0;
16,429✔
1057
}
1058

1059
ssize_t
1060
xlog_fallocate(struct xlog *log, size_t len)
7,624✔
1061
{
1062
#ifdef HAVE_FALLOCATE
1063
        static bool fallocate_not_supported = false;
1064
        if (fallocate_not_supported)
7,624!
1065
                return 0;
×
1066
        /*
1067
         * Keep the file size, because it is used to sync
1068
         * concurrent readers vs the writer: xlog_cursor
1069
         * assumes that everything written before EOF is
1070
         * valid data.
1071
         */
1072
        int rc = fallocate(log->fd, FALLOC_FL_KEEP_SIZE,
15,248✔
1073
                           log->offset + log->allocated, len);
7,624✔
1074
        if (rc != 0) {
7,624!
1075
                if (errno == ENOSYS || errno == EOPNOTSUPP) {
×
1076
                        say_warn("fallocate is not supported, "
×
1077
                                 "proceeding without it");
1078
                        fallocate_not_supported = true;
×
1079
                        return 0;
×
1080
                }
1081
                diag_set(SystemError, "%s: can't allocate disk space",
×
1082
                         log->filename);
1083
                return -1;
×
1084
        }
1085
        log->allocated += len;
7,624✔
1086
        return 0;
7,624✔
1087
#else
1088
        (void)log;
1089
        (void)len;
1090
        return 0;
1091
#endif /* HAVE_FALLOCATE */
1092
}
1093

1094
/**
1095
 * Write a sequence of uncompressed xrow objects.
1096
 *
1097
 * @retval -1 error
1098
 * @retval >= 0 the number of bytes written
1099
 */
1100
static off_t
1101
xlog_tx_write_plain(struct xlog *log)
4,046,580✔
1102
{
1103
        /**
1104
         * We created an obuf savepoint at start of xlog_tx,
1105
         * now populate it with data.
1106
         */
1107
        char *fixheader = (char *)log->obuf.iov[0].iov_base;
4,046,580✔
1108
        memcpy(fixheader, &row_marker, sizeof(log_magic_t));
4,046,580✔
1109
        char *data = fixheader + sizeof(log_magic_t);
4,046,580✔
1110

1111
        data = mp_encode_uint(data,
8,093,160✔
1112
                              obuf_size(&log->obuf) - XLOG_FIXHEADER_SIZE);
4,046,580✔
1113
        /* Encode crc32 for previous row */
1114
        data = mp_encode_uint(data, 0);
4,046,580✔
1115
        /* Encode crc32 for current row */
1116
        uint32_t crc32c = 0;
4,046,580✔
1117
        struct iovec *iov;
1118
        size_t offset = XLOG_FIXHEADER_SIZE;
4,046,580✔
1119
        for (iov = log->obuf.iov; iov->iov_len; ++iov) {
8,093,180✔
1120
                crc32c = crc32_calc(crc32c,
8,093,180✔
1121
                                    (char *)iov->iov_base + offset,
4,046,580✔
1122
                                    iov->iov_len - offset);
4,046,580✔
1123
                offset = 0;
4,046,600✔
1124
        }
1125
        data = mp_encode_uint(data, crc32c);
4,046,590✔
1126
        /*
1127
         * Encode a padding, to ensure the resulting
1128
         * fixheader always has the same size.
1129
         */
1130
        ssize_t padding = XLOG_FIXHEADER_SIZE - (data - fixheader);
4,046,590✔
1131
        if (padding > 0) {
4,046,590!
1132
                data = mp_encode_strl(data, padding - 1);
4,046,590✔
1133
                if (padding > 1) {
4,046,590!
1134
                        memset(data, 0, padding - 1);
4,046,590✔
1135
                        data += padding - 1;
4,046,590✔
1136
                }
1137
        }
1138

1139
        ERROR_INJECT(ERRINJ_WAL_WRITE_DISK, {
4,046,590!
1140
                diag_set(ClientError, ER_INJECTION, "xlog write injection");
1141
                return -1;
1142
        });
1143

1144
        ssize_t written = fio_writevn(log->fd, log->obuf.iov, log->obuf.pos + 1);
4,046,580✔
1145
        if (written < 0) {
4,046,590!
1146
                diag_set(SystemError, "failed to write to '%s' file",
×
1147
                         log->filename);
1148
                return -1;
×
1149
        }
1150
        return obuf_size(&log->obuf);
4,046,590✔
1151
}
1152

1153
/**
1154
 * Write a compressed block of xrow objects.
1155
 * @retval -1  error
1156
 * @retval >= 0 the number of bytes written
1157
 */
1158
static off_t
1159
xlog_tx_write_zstd(struct xlog *log)
19,045✔
1160
{
1161
        char *fixheader = (char *)obuf_alloc(&log->zbuf,
19,045✔
1162
                                             XLOG_FIXHEADER_SIZE);
1163
        if (fixheader == NULL) {
19,045!
1164
                diag_set(OutOfMemory, XLOG_FIXHEADER_SIZE, "runtime arena",
×
1165
                         "compression buffer");
1166
                goto error;
×
1167
        }
1168
        uint32_t crc32c = 0;
19,045✔
1169
        struct iovec *iov;
1170
        /* 3 is compression level. */
1171
        ZSTD_compressBegin(log->zctx, 3);
19,045✔
1172
        size_t offset = XLOG_FIXHEADER_SIZE;
19,046✔
1173
        for (iov = log->obuf.iov; iov->iov_len; ++iov) {
38,657✔
1174
                /* Estimate max output buffer size. */
1175
                size_t zmax_size = ZSTD_compressBound(iov->iov_len - offset);
19,608✔
1176
                /* Allocate a destination buffer. */
1177
                void *zdst = obuf_reserve(&log->zbuf, zmax_size);
19,608✔
1178
                if (!zdst) {
19,608!
1179
                        diag_set(OutOfMemory, zmax_size, "runtime arena",
×
1180
                                  "compression buffer");
1181
                        goto error;
×
1182
                }
1183
                size_t (*fcompress)(ZSTD_CCtx *, void *, size_t,
1184
                                    const void *, size_t);
1185
                /*
1186
                 * If it's the last iov or the last
1187
                 * log has 0 bytes, end the stream.
1188
                 */
1189
                if (iov == log->obuf.iov + log->obuf.pos ||
19,608✔
1190
                    !(iov + 1)->iov_len) {
562!
1191
                        fcompress = ZSTD_compressEnd;
19,046✔
1192
                } else {
1193
                        fcompress = ZSTD_compressContinue;
562✔
1194
                }
1195
                size_t zsize = fcompress(log->zctx, zdst, zmax_size,
39,217✔
1196
                                         (char *)iov->iov_base + offset,
19,608✔
1197
                                         iov->iov_len - offset);
19,608✔
1198
                if (ZSTD_isError(zsize)) {
19,609!
1199
                        diag_set(ClientError, ER_COMPRESSION,
×
1200
                                 ZSTD_getErrorName(zsize));
1201
                        goto error;
×
1202
                }
1203
                /* Advance output buffer to the end of compressed data. */
1204
                obuf_alloc(&log->zbuf, zsize);
19,610✔
1205
                /* Update crc32c */
1206
                crc32c = crc32_calc(crc32c, (char *)zdst, zsize);
19,610✔
1207
                /* Discount fixheader size for all iovs after first. */
1208
                offset = 0;
19,611✔
1209
        }
1210

1211
        memcpy(fixheader, &zrow_marker, sizeof(log_magic_t));
19,049✔
1212
        char *data;
1213
        data = fixheader + sizeof(log_magic_t);
19,049✔
1214
        data = mp_encode_uint(data,
38,096✔
1215
                              obuf_size(&log->zbuf) - XLOG_FIXHEADER_SIZE);
19,049✔
1216
        /* Encode crc32 for previous row */
1217
        data = mp_encode_uint(data, 0);
19,048✔
1218
        /* Encode crc32 for current row */
1219
        data = mp_encode_uint(data, crc32c);
19,048✔
1220
        /* Encode padding */
1221
        ssize_t padding;
1222
        padding = XLOG_FIXHEADER_SIZE - (data - fixheader);
19,048✔
1223
        if (padding > 0) {
19,048!
1224
                data = mp_encode_strl(data, padding - 1);
19,048✔
1225
                if (padding > 1) {
19,048!
1226
                        memset(data, 0, padding - 1);
19,048✔
1227
                        data += padding - 1;
19,048✔
1228
                }
1229
        }
1230

1231
        ERROR_INJECT(ERRINJ_WAL_WRITE_DISK, {
19,048!
1232
                diag_set(ClientError, ER_INJECTION, "xlog write injection");
1233
                obuf_reset(&log->zbuf);
1234
                goto error;
1235
        });
1236

1237
        ssize_t written;
1238
        written = fio_writevn(log->fd, log->zbuf.iov,
38,084✔
1239
                              log->zbuf.pos + 1);
19,042✔
1240
        if (written < 0) {
19,042!
1241
                diag_set(SystemError, "failed to write to '%s' file",
×
1242
                         log->filename);
1243
                goto error;
×
1244
        }
1245
        obuf_reset(&log->zbuf);
19,042✔
1246
        return written;
19,042✔
1247
error:
6✔
1248
        obuf_reset(&log->zbuf);
6✔
1249
        return -1;
6✔
1250
}
1251

1252
/* file syncing and posix_fadvise() should be rounded by a page boundary */
1253
#define SYNC_MASK                (4096 - 1)
1254
#define SYNC_ROUND_DOWN(size)        ((size) & ~(4096 - 1))
1255
#define SYNC_ROUND_UP(size)        (SYNC_ROUND_DOWN(size + SYNC_MASK))
1256

1257
/**
1258
 * Writes xlog batch to file
1259
 */
1260
static ssize_t
1261
xlog_tx_write(struct xlog *log)
4,065,630✔
1262
{
1263
        if (obuf_size(&log->obuf) == XLOG_FIXHEADER_SIZE)
4,065,630!
1264
                return 0;
×
1265
        ssize_t written;
1266

1267
        if (!log->opts.no_compression &&
4,065,630✔
1268
            obuf_size(&log->obuf) >= XLOG_TX_COMPRESS_THRESHOLD) {
4,039,230✔
1269
                written = xlog_tx_write_zstd(log);
19,045✔
1270
        } else {
1271
                written = xlog_tx_write_plain(log);
4,046,580✔
1272
        }
1273
        ERROR_INJECT(ERRINJ_WAL_WRITE, {
4,065,640!
1274
                diag_set(ClientError, ER_INJECTION, "xlog write injection");
1275
                written = -1;
1276
        });
1277

1278
        obuf_reset(&log->obuf);
4,065,640✔
1279
        /*
1280
         * Simplify recovery after a temporary write failure:
1281
         * truncate the file to the best known good write
1282
         * position.
1283
         */
1284
        if (written < 0) {
4,065,640✔
1285
                if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
316!
1286
                    ftruncate(log->fd, log->offset) != 0)
159✔
1287
                        panic_syserror("failed to truncate xlog after write error");
×
1288
                log->allocated = 0;
159✔
1289
                return -1;
159✔
1290
        }
1291
        if (log->allocated > (size_t)written)
4,065,480✔
1292
                log->allocated -= written;
3,943,440✔
1293
        else
1294
                log->allocated = 0;
122,048✔
1295
        log->offset += written;
4,065,480✔
1296
        log->rows += log->tx_rows;
4,065,480✔
1297
        log->tx_rows = 0;
4,065,480✔
1298
        if ((log->opts.sync_interval && log->offset >=
4,065,480✔
1299
            (off_t)(log->synced_size + log->opts.sync_interval)) ||
88,432!
1300
            (log->opts.rate_limit && log->offset >=
4,065,480✔
1301
            (off_t)(log->synced_size + log->opts.rate_limit))) {
324✔
1302
                off_t sync_from = SYNC_ROUND_DOWN(log->synced_size);
5✔
1303
                size_t sync_len = SYNC_ROUND_UP(log->offset) -
5✔
1304
                                  sync_from;
1305
                if (log->opts.rate_limit > 0) {
5✔
1306
                        double throttle_time;
1307
                        throttle_time = (double)sync_len / log->opts.rate_limit -
6✔
1308
                                        (ev_monotonic_time() - log->sync_time);
3✔
1309
                        if (throttle_time > 0)
3!
1310
                                ev_sleep(throttle_time);
3✔
1311
                }
1312
                /** sync data from cache to disk */
1313
#ifdef HAVE_SYNC_FILE_RANGE
1314
                sync_file_range(log->fd, sync_from, sync_len,
5✔
1315
                                SYNC_FILE_RANGE_WAIT_BEFORE |
1316
                                SYNC_FILE_RANGE_WRITE |
1317
                                SYNC_FILE_RANGE_WAIT_AFTER);
1318
#else
1319
                fdatasync(log->fd);
1320
#endif /* HAVE_SYNC_FILE_RANGE */
1321
                log->sync_time = ev_monotonic_time();
3✔
1322
                if (log->opts.free_cache) {
3!
1323
#ifdef HAVE_POSIX_FADVISE
1324
                        /** free page cache */
1325
                        if (posix_fadvise(log->fd, sync_from, sync_len,
×
1326
                                          POSIX_FADV_DONTNEED) != 0) {
1327
                                say_syserror("posix_fadvise, fd=%i", log->fd);
×
1328
                        }
1329
#else
1330
                        (void) sync_from;
1331
                        (void) sync_len;
1332
#endif /* HAVE_POSIX_FADVISE */
1333
                }
1334
                log->synced_size = log->offset;
3✔
1335
        }
1336
        return written;
4,065,480✔
1337
}
1338

1339
/*
1340
 * Add a row to a log and possibly flush the log.
1341
 *
1342
 * @retval  -1 error, check diag.
1343
 * @retval >=0 the number of bytes written to buffer.
1344
 */
1345
ssize_t
1346
xlog_write_row(struct xlog *log, const struct xrow_header *packet)
15,761,800✔
1347
{
1348
        /*
1349
         * Automatically reserve space for a fixheader when adding
1350
         * the first row in * a log. The fixheader is populated
1351
         * at write. @sa xlog_tx_write().
1352
         */
1353
        if (obuf_size(&log->obuf) == 0) {
15,761,800!
1354
                if (!obuf_alloc(&log->obuf, XLOG_FIXHEADER_SIZE)) {
4,065,660!
1355
                        diag_set(OutOfMemory, XLOG_FIXHEADER_SIZE,
×
1356
                                  "runtime arena", "xlog tx output buffer");
1357
                        return -1;
15,748,500✔
1358
                }
1359
        }
1360

1361
        struct obuf_svp svp = obuf_create_svp(&log->obuf);
15,761,100!
1362
        size_t page_offset = obuf_size(&log->obuf);
15,760,600!
1363
        /** encode row into iovec */
1364
        struct iovec iov[XROW_IOVMAX];
1365
        /** don't write sync to the disk */
1366
        size_t region_svp = region_used(&fiber()->gc);
15,758,500!
1367
        int iovcnt;
1368
        xrow_encode(packet, /*sync=*/0, /*fixheader_len=*/0, iov, &iovcnt);
15,759,500!
1369
        for (int i = 0; i < iovcnt; ++i) {
49,418,300✔
1370
                struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL,
33,649,400!
1371
                                            ERRINJ_INT);
1372
                if (inj != NULL && inj->iparam >= 0 &&
33,649,400!
1373
                    obuf_size(&log->obuf) > (size_t)inj->iparam) {
12!
1374
                        diag_set(ClientError, ER_INJECTION,
4!
1375
                                 "xlog write injection");
1376
                        obuf_rollback_to_svp(&log->obuf, &svp);
4!
1377
                        region_truncate(&fiber()->gc, region_svp);
4!
1378
                        return -1;
4✔
1379
                };
1380
                if (obuf_dup(&log->obuf, iov[i].iov_base, iov[i].iov_len) <
33,649,400!
1381
                    iov[i].iov_len) {
33,672,100!
1382
                        diag_set(OutOfMemory, XLOG_FIXHEADER_SIZE,
×
1383
                                  "runtime arena", "xlog tx output buffer");
1384
                        obuf_rollback_to_svp(&log->obuf, &svp);
×
1385
                        region_truncate(&fiber()->gc, region_svp);
×
1386
                        return -1;
×
1387
                }
1388
        }
1389
        region_truncate(&fiber()->gc, region_svp);
15,768,900!
1390
        assert(iovcnt <= XROW_IOVMAX);
15,748,800!
1391
        log->tx_rows++;
15,748,800✔
1392

1393
        size_t row_size = obuf_size(&log->obuf) - page_offset;
15,748,800!
1394
        if (log->is_autocommit &&
15,748,500✔
1395
            obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
2,203,220!
1396
            xlog_tx_write(log) < 0)
401!
1397
                return -1;
×
1398

1399
        return row_size;
15,748,500✔
1400
}
1401

1402
/**
1403
 * Begin a multi-statement xlog transaction. All xrow objects
1404
 * of a single transaction share the same header and checksum
1405
 * and are normally written at once.
1406
 */
1407
void
1408
xlog_tx_begin(struct xlog *log)
4,107,340✔
1409
{
1410
        log->is_autocommit = false;
4,107,340✔
1411
}
4,107,340✔
1412

1413
/*
1414
 * End a non-interruptible batch of rows, thus enable flushes of
1415
 * a transaction at any time, on threshold. If the buffer is big
1416
 * enough already, flush it at once here.
1417
 *
1418
 * @retval -1  error
1419
 * @retval >= 0 the number of bytes written to disk
1420
 */
1421
ssize_t
1422
xlog_tx_commit(struct xlog *log)
4,107,320✔
1423
{
1424
        log->is_autocommit = true;
4,107,320✔
1425
        if (obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD) {
4,107,320✔
1426
                return xlog_tx_write(log);
1,062✔
1427
        }
1428
        return 0;
4,106,260✔
1429
}
1430

1431
/*
1432
 * Rollback a batch of buffered rows without writing to file
1433
 */
1434
void
1435
xlog_tx_rollback(struct xlog *log)
4✔
1436
{
1437
        log->is_autocommit = true;
4✔
1438
        log->tx_rows = 0;
4✔
1439
        obuf_reset(&log->obuf);
4✔
1440
}
4✔
1441

1442
/**
1443
 * Flush any outstanding xlog_tx transactions at the end of
1444
 * a WAL write batch.
1445
 */
1446
ssize_t
1447
xlog_flush(struct xlog *log)
4,080,240✔
1448
{
1449
        assert(log->is_autocommit);
4,080,240!
1450
        if (log->obuf.used == 0)
4,080,240✔
1451
                return 0;
16,072✔
1452
        return xlog_tx_write(log);
4,064,170✔
1453
}
1454

1455
static int
1456
sync_cb(eio_req *req)
3,705✔
1457
{
1458
        int fd = (intptr_t) req->data;
3,705✔
1459
        if (req->result) {
3,705!
1460
                errno = req->errorno;
×
1461
                say_syserror("%s: fsync() failed",
×
1462
                             fio_filename(fd));
1463
                errno = 0;
×
1464
        }
1465
        close(fd);
3,705✔
1466
        return 0;
3,705✔
1467
}
1468

1469
/**
1470
 * Syncs an xlog object to disk.
1471
 *
1472
 * If the sync_is_async flag is set in xlog_opts, fsync is called
1473
 * asynchronously, without checking the result.
1474
 *
1475
 * Returns 0 on success. On failure, sets diag returns -1.
1476
 */
1477
static int
1478
xlog_sync(struct xlog *l)
23,413✔
1479
{
1480
        if (l->opts.sync_is_async) {
23,413✔
1481
                int fd = dup(l->fd);
10,670✔
1482
                if (fd == -1) {
10,670!
1483
                        diag_set(SystemError, "failed to duplicate fd %d",
×
1484
                                 l->fd);
1485
                        return -1;
×
1486
                }
1487
                eio_fsync(fd, 0, sync_cb, (void *) (intptr_t) fd);
10,670✔
1488
        } else if (fsync(l->fd) < 0) {
12,743!
1489
                diag_set(SystemError, "failed to sync file '%s'", l->filename);
×
1490
                return -1;
×
1491
        }
1492
        return 0;
23,412✔
1493
}
1494

1495
/**
1496
 * Writes the EOF marker to the xlog file.
1497
 *
1498
 * Returns 0 on success. On failure, sets diag returns -1.
1499
 */
1500
static int
1501
xlog_write_eof(struct xlog *l)
23,413✔
1502
{
1503
        ERROR_INJECT(ERRINJ_WAL_WRITE_EOF, return 0);
23,413!
1504

1505
        /*
1506
         * Free disk space preallocated with xlog_fallocate().
1507
         * Don't write the eof marker if this fails, otherwise
1508
         * we'll get "data after eof marker" error on recovery.
1509
         */
1510
        if (l->allocated > 0 && ftruncate(l->fd, l->offset) < 0) {
23,405!
1511
                diag_set(SystemError, "failed to truncate file '%s'",
×
1512
                         l->filename);
1513
                return -1;
×
1514
        }
1515

1516
        if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) {
23,405!
1517
                diag_set(SystemError, "failed to write to file '%s'",
×
1518
                         l->filename);
1519
                return -1;
×
1520
        }
1521
        return 0;
23,405✔
1522
}
1523

1524
int
1525
xlog_close_reuse_fd(struct xlog *l, int *fd)
23,414✔
1526
{
1527
        assert(l->fd >= 0);
23,414!
1528
        int rc = xlog_flush(l) < 0 ? -1 : 0;
23,414✔
1529
        if (rc == 0)
23,415✔
1530
                rc = xlog_write_eof(l);
23,413✔
1531
        if (rc == 0)
23,415✔
1532
                rc = xlog_sync(l);
23,413✔
1533
        *fd = l->fd;
23,415✔
1534
        l->fd = -1;
23,415✔
1535
        xlog_free(l);
23,415✔
1536
        return rc;
23,415✔
1537
}
1538

1539
int
1540
xlog_close(struct xlog *l)
20,610✔
1541
{
1542
        int fd;
1543
        int rc = xlog_close_reuse_fd(l, &fd);
20,610!
1544
        close(fd);
20,611!
1545
        return rc;
20,611✔
1546
}
1547

1548
void
1549
xlog_discard(struct xlog *l)
50✔
1550
{
1551
        if (l->fd >= 0) {
50✔
1552
                close(l->fd);
18✔
1553
                l->fd = -1;
18✔
1554
                xlog_free(l);
18✔
1555
        }
1556
        if (l->filename[0] != '\0') {
50✔
1557
                assert(l->is_inprogress);
35!
1558
                xlog_remove_file(l->filename, 0);
35✔
1559
                l->filename[0] = '\0';
35✔
1560
        }
1561
}
50✔
1562

1563
/* }}} */
1564

1565
/* {{{ struct xlog_cursor */
1566

1567
enum {
1568
        /**
1569
         * Min and max values for xlog_cursor::read_ahead.
1570
         */
1571
        XLOG_READ_AHEAD_MIN = XLOG_TX_AUTOCOMMIT_THRESHOLD,
1572
        XLOG_READ_AHEAD_MAX = 8 * 1024 * 1024,
1573
};
1574

1575
/**
1576
 * Ensure that at least count bytes are in read buffer
1577
 *
1578
 * @retval 0 at least count bytes are in read buf
1579
 * @retval 1 if eof
1580
 * @retval -1 if error
1581
 */
1582
static int
1583
xlog_cursor_ensure(struct xlog_cursor *cursor, size_t count)
286,798✔
1584
{
1585
        if (ibuf_used(&cursor->rbuf) >= count)
286,798✔
1586
                return 0;
204,210✔
1587
        /* in-memory mode */
1588
        if (cursor->fd < 0)
82,582✔
1589
                return 1;
2,878✔
1590

1591
        size_t to_load = count - ibuf_used(&cursor->rbuf);
79,704✔
1592
        to_load += cursor->read_ahead;
79,697✔
1593

1594
        void *dst = ibuf_reserve(&cursor->rbuf, to_load);
79,697✔
1595
        if (dst == NULL) {
79,676!
1596
                diag_set(OutOfMemory, to_load, "runtime",
×
1597
                         "xlog cursor read buffer");
1598
                return -1;
×
1599
        }
1600
        ssize_t readen;
1601
        readen = fio_pread(cursor->fd, dst, to_load,
79,676✔
1602
                           cursor->read_offset);
1603
        struct errinj *inj = errinj(ERRINJ_XLOG_READ, ERRINJ_INT);
79,854!
1604
        if (inj != NULL && inj->iparam >= 0 &&
79,856!
1605
            inj->iparam < cursor->read_offset) {
×
1606
                readen = -1;
×
1607
                errno = EIO;
×
1608
        };
1609
        if (readen < 0) {
79,856!
1610
                diag_set(SystemError, "failed to read '%s' file",
×
1611
                         cursor->name);
1612
                return -1;
×
1613
        }
1614
        assert((size_t)readen <= to_load);
79,856!
1615
        /* ibuf_reserve() has been called above, ibuf_alloc() must not fail */
1616
        ibuf_alloc(&cursor->rbuf, readen);
79,856✔
1617
        /* Shrink the read buffer to reduce the memory consumption. */
1618
        if (cursor->need_rbuf_shrink) {
79,843✔
1619
                ibuf_shrink(&cursor->rbuf);
61,316✔
1620
                cursor->need_rbuf_shrink = false;
61,190✔
1621
        }
1622
        /*
1623
         * Grow readahead size if the requested number of bytes was successfully
1624
         * read, and decrease it to the minimum otherwise.
1625
         */
1626
        if ((size_t)readen == to_load) {
79,717✔
1627
                if (cursor->read_ahead * 2 <= XLOG_READ_AHEAD_MAX)
163✔
1628
                        cursor->read_ahead *= 2;
156✔
1629
        } else {
1630
                cursor->need_rbuf_shrink = true;
79,554✔
1631
                cursor->read_ahead = XLOG_READ_AHEAD_MIN;
79,554✔
1632
        }
1633
        cursor->read_offset += readen;
79,717✔
1634
        return ibuf_used(&cursor->rbuf) >= count ? 0: 1;
79,717✔
1635
}
1636

1637
/**
1638
 * Decompress zstd-compressed buf into cursor row block
1639
 *
1640
 * @retval -1 error, check diag
1641
 * @retval  0 data fully decompressed
1642
 * @retval  1 need more bytes in the output buffer
1643
 */
1644
static int
1645
xlog_cursor_decompress(char **rows, char *rows_end, const char **data,
15,595✔
1646
                       const char *data_end, ZSTD_DStream *zdctx)
1647
{
1648
        ZSTD_inBuffer input = {*data, (size_t)(data_end - *data), 0};
15,595✔
1649
        ZSTD_outBuffer output = {*rows, (size_t)(rows_end - *rows), 0};
15,595✔
1650

1651
        while (input.pos < input.size && output.pos < output.size) {
31,192✔
1652
                size_t rc = ZSTD_decompressStream(zdctx, &output, &input);
15,595!
1653
                if (ZSTD_isError(rc)) {
15,597!
1654
                        diag_set(ClientError, ER_DECOMPRESSION,
×
1655
                                 ZSTD_getErrorName(rc));
1656
                        return -1;
15,597✔
1657
                }
1658
                assert(output.pos <= (size_t)(rows_end - *rows));
15,597!
1659
                *rows = (char *)output.dst + output.pos;
15,597✔
1660
                *data = (char *)input.src + input.pos;
15,597✔
1661
        }
1662
        return input.pos == input.size ? 0: 1;
15,597✔
1663
}
1664

1665
/**
1666
 * xlog fixheader struct
1667
 */
1668
struct xlog_fixheader {
1669
        /**
1670
         * xlog tx magic, row_marker for plain xrows
1671
         * or zrow_marker for compressed.
1672
         */
1673
        log_magic_t magic;
1674
        /**
1675
         * crc32 for the previous xlog tx, not used now
1676
         */
1677
        uint32_t crc32p;
1678
        /**
1679
         * crc32 for current xlog tx
1680
         */
1681
        uint32_t crc32c;
1682
        /**
1683
         * xlog tx data length excluding fixheader
1684
         */
1685
        uint32_t len;
1686
};
1687

1688
/**
1689
 * Decode xlog tx header, set up magic, crc32c and len
1690
 *
1691
 * @retval 0 for success
1692
 * @retval -1 for error
1693
 * @retval count of bytes left to parse header
1694
 */
1695
static ssize_t
1696
xlog_fixheader_decode(struct xlog_fixheader *fixheader,
676,717✔
1697
                      const char **data, const char *data_end)
1698
{
1699
        if (data_end - *data < (ptrdiff_t)XLOG_FIXHEADER_SIZE)
676,717✔
1700
                return XLOG_FIXHEADER_SIZE - (data_end - *data);
676,625✔
1701
        const char *pos = *data;
676,712✔
1702
        const char *end = pos + XLOG_FIXHEADER_SIZE;
676,712✔
1703

1704
        /* Decode magic */
1705
        fixheader->magic = load_u32(pos);
676,712!
1706
        if (fixheader->magic != row_marker &&
676,708✔
1707
            fixheader->magic != zrow_marker) {
15,600!
1708
                diag_set(XlogError, "invalid magic: 0x%x", fixheader->magic);
×
1709
                return -1;
×
1710
        }
1711
        pos += sizeof(fixheader->magic);
676,708✔
1712

1713
        /* Read length */
1714
        const char *val = pos;
676,708✔
1715
        if (pos >= end || mp_check(&pos, end) != 0 ||
676,708!
1716
            mp_typeof(*val) != MP_UINT) {
1,353,400!
1717
                diag_set(XlogError, "broken fixheader length");
×
1718
                return -1;
×
1719
        }
1720
        fixheader->len = mp_decode_uint(&val);
676,705!
1721
        assert(val == pos);
676,692!
1722
        if (fixheader->len > IPROTO_BODY_LEN_MAX) {
676,692!
1723
                diag_set(XlogError, "too large fixheader length");
×
1724
                return -1;
×
1725
        }
1726

1727
        /* Read previous crc32 */
1728
        if (pos >= end || mp_check(&pos, end) != 0 ||
676,692!
1729
            mp_typeof(*val) != MP_UINT) {
1,353,350!
1730
                diag_set(XlogError, "broken fixheader crc32p");
×
1731
                return -1;
×
1732
        }
1733
        fixheader->crc32p = mp_decode_uint(&val);
676,676!
1734
        assert(val == pos);
676,663!
1735

1736
        /* Read current crc32 */
1737
        if (pos >= end || mp_check(&pos, end) != 0 ||
676,663!
1738
            mp_typeof(*val) != MP_UINT) {
1,353,290!
1739
                diag_set(XlogError, "broken fixheader crc32c");
×
1740
                return -1;
×
1741
        }
1742
        fixheader->crc32c = mp_decode_uint(&val);
676,647!
1743
        assert(val == pos);
676,642!
1744

1745
        /* Check and skip padding if any */
1746
        if (pos < end && (mp_check(&pos, end) != 0 || pos != end)) {
676,642!
1747
                diag_set(XlogError, "broken fixheader padding");
×
1748
                return -1;
×
1749
        }
1750

1751
        assert(pos == end);
676,620!
1752
        *data = end;
676,620✔
1753
        return 0;
676,620✔
1754
}
1755

1756
int
1757
xlog_tx_decode(const char *data, const char *data_end,
476,984✔
1758
               char *rows, char *rows_end, ZSTD_DStream *zdctx)
1759
{
1760
        /* Decode fixheader */
1761
        struct xlog_fixheader fixheader;
1762
        if (xlog_fixheader_decode(&fixheader, &data, data_end) != 0)
476,984!
1763
                return -1;
476,973✔
1764

1765
        /* Check that buffer has enough bytes */
1766
        if (data + fixheader.len != data_end) {
476,951!
1767
                diag_set(XlogError, "invalid compressed length: "
×
1768
                          "expected %zd, got %u",
1769
                          data_end - data, fixheader.len);
1770
                return -1;
×
1771
        }
1772

1773
        ERROR_INJECT(ERRINJ_XLOG_GARBAGE, {
476,951!
1774
                *((char *)data + fixheader.len / 2) = ~*((char *)data + fixheader.len / 2);
1775
        });
1776

1777
        /* Validate checksum */
1778
        if (crc32_calc(0, data, fixheader.len) != fixheader.crc32c) {
476,951!
1779
                diag_set(XlogError, "tx checksum mismatch");
1!
1780
                return -1;
1✔
1781
        }
1782

1783
        /* Copy uncompressed rows */
1784
        if (fixheader.magic == row_marker) {
476,960✔
1785
                if (rows_end - rows != (ptrdiff_t)fixheader.len) {
469,411!
1786
                        diag_set(XlogError, "invalid unpacked length: "
×
1787
                                  "expected %zd, got %u",
1788
                                  rows_end - data, fixheader.len);
1789
                        return -1;
×
1790
                }
1791
                memcpy(rows, data, fixheader.len);
469,411✔
1792
                return 0;
469,411✔
1793
        }
1794

1795
        /* Decompress zstd rows */
1796
        assert(fixheader.magic == zrow_marker);
7,549!
1797
        ZSTD_initDStream(zdctx);
7,549!
1798
        int rc = xlog_cursor_decompress(&rows, rows_end, &data, data_end,
7,559!
1799
                                        zdctx);
1800
        if (rc < 0) {
7,561!
1801
                return -1;
×
1802
        } else if (rc > 0) {
7,561!
1803
                diag_set(XlogError, "invalid decompressed length: "
×
1804
                          "expected %zd, got %zd", rows_end - data,
1805
                           rows_end - data + XLOG_TX_AUTOCOMMIT_THRESHOLD);
1806
                return -1;
×
1807
        }
1808

1809
        assert(data == data_end);
7,561!
1810
        return 0;
7,561✔
1811
}
1812

1813
/**
1814
 * @retval -1 error
1815
 * @retval 0 success
1816
 * @retval >0 how many bytes we will have for continue
1817
 */
1818
ssize_t
1819
xlog_tx_cursor_create(struct xlog_tx_cursor *tx_cursor,
199,732✔
1820
                      const char **data, const char *data_end,
1821
                      ZSTD_DStream *zdctx)
1822
{
1823
        const char *rpos = *data;
199,732✔
1824
        struct xlog_fixheader fixheader;
1825
        ssize_t to_load;
1826
        to_load = xlog_fixheader_decode(&fixheader, &rpos, data_end);
199,732!
1827
        if (to_load != 0)
199,673✔
1828
                return to_load;
199,679✔
1829

1830
        /* Check that buffer has enough bytes */
1831
        if ((data_end - rpos) < (ptrdiff_t)fixheader.len)
199,668✔
1832
                return fixheader.len - (data_end - rpos);
81✔
1833

1834
        ERROR_INJECT(ERRINJ_XLOG_GARBAGE, {
199,587!
1835
                *((char *)rpos + fixheader.len / 2) = ~*((char *)rpos + fixheader.len / 2);
1836
        });
1837

1838
        /* Validate checksum */
1839
        if (crc32_calc(0, rpos, fixheader.len) != fixheader.crc32c) {
199,587!
1840
                diag_set(XlogError, "tx checksum mismatch");
5!
1841
                return -1;
5✔
1842
        }
1843
        data_end = rpos + fixheader.len;
199,621✔
1844

1845
        ibuf_create(&tx_cursor->rows, &cord()->slabc,
199,621!
1846
                    XLOG_TX_AUTOCOMMIT_THRESHOLD);
1847
        if (fixheader.magic == row_marker) {
199,623✔
1848
                void *dst = ibuf_alloc(&tx_cursor->rows, fixheader.len);
191,627!
1849
                if (dst == NULL) {
191,590!
1850
                        diag_set(OutOfMemory, fixheader.len,
×
1851
                                 "runtime", "xlog rows buffer");
1852
                        ibuf_destroy(&tx_cursor->rows);
×
1853
                        return -1;
×
1854
                }
1855
                memcpy(dst, rpos, fixheader.len);
191,590✔
1856
                *data = (char *)rpos + fixheader.len;
191,590✔
1857
                assert(*data <= data_end);
191,590!
1858
                tx_cursor->size = ibuf_used(&tx_cursor->rows);
191,590!
1859
                return 0;
191,592✔
1860
        };
1861

1862
        assert(fixheader.magic == zrow_marker);
7,996!
1863
        ZSTD_initDStream(zdctx);
7,996!
1864
        int rc;
1865
        do {
1866
                if (ibuf_reserve(&tx_cursor->rows,
8,036!
1867
                                 XLOG_TX_AUTOCOMMIT_THRESHOLD) == NULL) {
1868
                        diag_set(OutOfMemory, XLOG_TX_AUTOCOMMIT_THRESHOLD,
×
1869
                                  "runtime", "xlog output buffer");
1870
                        ibuf_destroy(&tx_cursor->rows);
×
1871
                        return -1;
×
1872
                }
1873
        } while ((rc = xlog_cursor_decompress(&tx_cursor->rows.wpos,
8,036!
1874
                                              tx_cursor->rows.end, &rpos,
1875
                                              data_end, zdctx)) == 1);
8,036✔
1876
        if (rc != 0)
7,996!
1877
                return -1;
×
1878

1879
        *data = rpos;
7,996✔
1880
        assert(*data <= data_end);
7,996!
1881
        tx_cursor->size = ibuf_used(&tx_cursor->rows);
7,996!
1882
        return 0;
7,996✔
1883
}
1884

1885
int
1886
xlog_tx_cursor_next_row(struct xlog_tx_cursor *tx_cursor,
2,421,570✔
1887
                        struct xrow_header *xrow)
1888
{
1889
        if (ibuf_used(&tx_cursor->rows) == 0)
2,421,570✔
1890
                return 1;
198,125✔
1891
        /* Return row from xlog tx buffer */
1892
        int rc = xrow_decode(xrow, (const char **)&tx_cursor->rows.rpos,
4,446,720✔
1893
                             (const char *)tx_cursor->rows.wpos, false);
2,223,420✔
1894
        if (rc != 0) {
2,223,300✔
1895
                diag_set(XlogError, "can't parse row");
2!
1896
                /* Discard remaining row data */
1897
                ibuf_reset(&tx_cursor->rows);
3✔
1898
                return -1;
3✔
1899
        }
1900

1901
        return 0;
2,223,300✔
1902
}
1903

1904
int
1905
xlog_tx_cursor_next_row_raw(struct xlog_tx_cursor *tx_cursor,
22,884✔
1906
                            const char ***data, const char **end)
1907
{
1908
        if (ibuf_used(&tx_cursor->rows) == 0)
22,884✔
1909
                return 1;
199✔
1910
        *data = (const char **)&tx_cursor->rows.rpos;
22,685✔
1911
        *end = (const char *)tx_cursor->rows.wpos;
22,685✔
1912
        return 0;
22,685✔
1913
}
1914

1915
int
1916
xlog_tx_cursor_destroy(struct xlog_tx_cursor *tx_cursor)
199,480✔
1917
{
1918
        assert(tx_cursor->rows.slabc == &cord()->slabc);
199,480!
1919
        ibuf_destroy(&tx_cursor->rows);
199,480✔
1920
        return 0;
199,529✔
1921
}
1922

1923
/**
1924
 * Find a next xlog tx magic
1925
 */
1926
int
1927
xlog_cursor_find_tx_magic(struct xlog_cursor *i)
5✔
1928
{
1929
        assert(xlog_cursor_is_open(i));
5!
1930
        log_magic_t magic;
1931
        do {
1932
                /*
1933
                 * Read one extra byte to start searching from the next
1934
                 * byte.
1935
                 */
1936
                int rc = xlog_cursor_ensure(i, sizeof(log_magic_t) + 1);
24,265✔
1937
                if (rc < 0)
24,265!
1938
                        return -1;
×
1939
                if (rc == 1)
24,265✔
1940
                        return 1;
1✔
1941

1942
                ++i->rbuf.rpos;
24,264✔
1943
                assert(i->rbuf.rpos + sizeof(log_magic_t) <= i->rbuf.wpos);
24,264!
1944
                magic = load_u32(i->rbuf.rpos);
24,264✔
1945
        } while (magic != row_marker && magic != zrow_marker);
24,264✔
1946

1947
        return 0;
4✔
1948
}
1949

1950
int
1951
xlog_cursor_next_tx(struct xlog_cursor *i)
237,464✔
1952
{
1953
        int rc;
1954
        assert(xlog_cursor_is_open(i));
237,464!
1955
        if (i->state == XLOG_CURSOR_TX) {
237,454✔
1956
                i->state = XLOG_CURSOR_ACTIVE;
198,323✔
1957
                xlog_tx_cursor_destroy(&i->tx_cursor);
198,323✔
1958
        }
1959
        /* load at least magic to check eof */
1960
        rc = xlog_cursor_ensure(i, sizeof(log_magic_t));
237,502✔
1961
        if (rc < 0)
237,504!
1962
                return -1;
×
1963
        if (rc > 0)
237,504✔
1964
                return 1;
31,377✔
1965
        if (load_u32(i->rbuf.rpos) == eof_marker) {
206,127✔
1966
                /* eof marker found */
1967
                goto eof_found;
6,506✔
1968
        }
1969

1970
        ssize_t to_load;
1971
        while ((to_load = xlog_tx_cursor_create(&i->tx_cursor,
399,329✔
1972
                                                (const char **)&i->rbuf.rpos,
199,733✔
1973
                                                i->rbuf.wpos, i->zdctx)) > 0) {
199,733✔
1974
                /* not enough data in read buffer */
1975
                int rc = xlog_cursor_ensure(i, ibuf_used(&i->rbuf) + to_load);
86✔
1976
                if (rc < 0)
86!
1977
                        return -1;
×
1978
                if (rc > 0)
86✔
1979
                        return 1;
5✔
1980
        }
1981
        if (to_load < 0)
199,591✔
1982
                return -1;
5✔
1983

1984
        i->state = XLOG_CURSOR_TX;
199,586✔
1985
        return 0;
199,586✔
1986
eof_found:
6,506✔
1987
        /*
1988
         * A eof marker is read, check that there is no
1989
         * more data in the file.
1990
         */
1991
        rc = xlog_cursor_ensure(i, sizeof(log_magic_t) + sizeof(char));
6,506✔
1992

1993
        if (rc < 0)
6,506!
1994
                return -1;
×
1995
        if (rc == 0) {
6,506✔
1996
                diag_set(XlogError, "%s: has some data after "
2!
1997
                          "eof marker at %lld", i->name,
1998
                          xlog_cursor_pos(i));
1999
                return -1;
2✔
2000
        }
2001
        i->state = XLOG_CURSOR_EOF;
6,504✔
2002
        return 1;
6,504✔
2003
}
2004

2005
int
2006
xlog_cursor_next_row(struct xlog_cursor *cursor, struct xrow_header *xrow)
2,459,530✔
2007
{
2008
        assert(xlog_cursor_is_open(cursor));
2,459,530!
2009
        if (cursor->state != XLOG_CURSOR_TX)
2,459,560✔
2010
                return 1;
37,994✔
2011
        return xlog_tx_cursor_next_row(&cursor->tx_cursor, xrow);
2,421,570✔
2012
}
2013

2014
int
2015
xlog_cursor_next_row_raw(struct xlog_cursor *cursor,
23,821✔
2016
                         const char ***data, const char **end)
2017
{
2018
        assert(xlog_cursor_is_open(cursor));
23,821!
2019
        if (cursor->state != XLOG_CURSOR_TX)
23,821✔
2020
                return 1;
937✔
2021
        return xlog_tx_cursor_next_row_raw(&cursor->tx_cursor, data, end);
22,884✔
2022
}
2023

2024
int
2025
xlog_cursor_next(struct xlog_cursor *cursor,
2,235,660✔
2026
                 struct xrow_header *xrow, bool force_recovery)
2027
{
2028
        assert(xlog_cursor_is_open(cursor));
2,235,660!
2029
        while (true) {
197,607✔
2030
                int rc;
2031
                rc = xlog_cursor_next_row(cursor, xrow);
2,433,250✔
2032
                if (rc == 0)
2,432,990✔
2033
                        break;
2,197,630✔
2034
                if (rc < 0) {
235,363✔
2035
                        struct error *e = diag_last_error(diag_get());
3✔
2036
                        if (!force_recovery ||
3✔
2037
                            e->type != &type_XlogError)
1!
2038
                                return -1;
2✔
2039
                        say_error("can't decode row: %s", e->errmsg);
1!
2040
                }
2041
                while ((rc = xlog_cursor_next_tx(cursor)) < 0) {
235,426!
2042
                        struct error *e = diag_last_error(diag_get());
2043
                        if (!force_recovery ||
4✔
2044
                            e->type != &type_XlogError)
2!
2045
                                return -1;
2✔
2046
                        say_error("can't open tx: %s", e->errmsg);
2!
2047
                        if ((rc = xlog_cursor_find_tx_magic(cursor)) < 0)
2!
2048
                                return -1;
×
2049
                        if (rc > 0)
2✔
2050
                                break;
1✔
2051
                }
2052
                if (rc == 1)
235,432✔
2053
                        return 1;
37,825✔
2054
        }
2055
        return 0;
2,197,630✔
2056
}
2057

2058
int
2059
xlog_cursor_openfd(struct xlog_cursor *i, int fd, const char *name)
18,443✔
2060
{
2061
        memset(i, 0, sizeof(*i));
18,443✔
2062
        i->fd = fd;
18,443✔
2063
        i->read_ahead = XLOG_READ_AHEAD_MIN;
18,443✔
2064
        ibuf_create(&i->rbuf, &cord()->slabc,
18,443!
2065
                    XLOG_TX_AUTOCOMMIT_THRESHOLD << 1);
2066

2067
        ssize_t rc;
2068
        /*
2069
         * we can have eof here, but this is no error,
2070
         * because we don't know exact meta size
2071
         */
2072
        rc = xlog_cursor_ensure(i, XLOG_META_LEN_MAX);
18,442✔
2073
        if (rc == -1)
18,443!
2074
                goto error;
×
2075
        rc = xlog_meta_parse(&i->meta,
36,886✔
2076
                             (const char **)&i->rbuf.rpos,
18,443✔
2077
                             (const char *)i->rbuf.wpos);
18,443✔
2078
        if (rc == -1)
18,443✔
2079
                goto error;
1✔
2080
        if (rc > 0) {
18,442✔
2081
                diag_set(XlogError, "Unexpected end of file, run with 'force_recovery = true'");
12!
2082
                goto error;
12✔
2083
        }
2084
        snprintf(i->name, sizeof(i->name), "%s", name);
18,430✔
2085
        i->zdctx = ZSTD_createDStream();
18,430✔
2086
        if (i->zdctx == NULL) {
18,430!
2087
                diag_set(ClientError, ER_DECOMPRESSION,
×
2088
                         "failed to create context");
2089
                goto error;
×
2090
        }
2091
        i->state = XLOG_CURSOR_ACTIVE;
18,430✔
2092
        return 0;
18,430✔
2093
error:
13✔
2094
        ibuf_destroy(&i->rbuf);
13✔
2095
        return -1;
13✔
2096
}
2097

2098
int
2099
xlog_cursor_open(struct xlog_cursor *i, const char *name)
2,285✔
2100
{
2101
        int fd = open(name, O_RDONLY);
2,285✔
2102
        if (fd < 0) {
2,285✔
2103
                diag_set(SystemError, "failed to open '%s' file", name);
5!
2104
                return -1;
5✔
2105
        }
2106
        int rc = xlog_cursor_openfd(i, fd, name);
2,280✔
2107
        if (rc < 0) {
2,280✔
2108
                close(fd);
1✔
2109
                return -1;
1✔
2110
        }
2111
        return 0;
2,279✔
2112
}
2113

2114
int
2115
xlog_cursor_openmem(struct xlog_cursor *i, const char *data, size_t size,
2,878✔
2116
                    const char *name)
2117
{
2118
        memset(i, 0, sizeof(*i));
2,878✔
2119
        i->fd = -1;
2,878✔
2120
        ibuf_create(&i->rbuf, &cord()->slabc,
2,878!
2121
                    XLOG_TX_AUTOCOMMIT_THRESHOLD << 1);
2122

2123
        void *dst = ibuf_alloc(&i->rbuf, size);
2,878✔
2124
        if (dst == NULL) {
2,878!
2125
                diag_set(OutOfMemory, size, "runtime",
×
2126
                          "xlog cursor read buffer");
2127
                goto error;
×
2128
        }
2129
        memcpy(dst, data, size);
2,878✔
2130
        i->read_offset = size;
2,878✔
2131
        int rc;
2132
        rc = xlog_meta_parse(&i->meta,
5,756✔
2133
                             (const char **)&i->rbuf.rpos,
2,878✔
2134
                             (const char *)i->rbuf.wpos);
2,878✔
2135
        if (rc < 0)
2,878!
2136
                goto error;
×
2137
        if (rc > 0) {
2,878!
2138
                diag_set(XlogError, "Unexpected end of file");
×
2139
                goto error;
×
2140
        }
2141
        snprintf(i->name, sizeof(i->name), "%s", name);
2,878✔
2142
        i->zdctx = ZSTD_createDStream();
2,878✔
2143
        if (i->zdctx == NULL) {
2,878!
2144
                diag_set(ClientError, ER_DECOMPRESSION,
×
2145
                         "failed to create context");
2146
                goto error;
×
2147
        }
2148
        i->state = XLOG_CURSOR_ACTIVE;
2,878✔
2149
        return 0;
2,878✔
2150
error:
×
2151
        ibuf_destroy(&i->rbuf);
×
2152
        return -1;
×
2153
}
2154

2155
void
2156
xlog_cursor_close(struct xlog_cursor *i, bool reuse_fd)
21,248✔
2157
{
2158
        assert(xlog_cursor_is_open(i));
21,248!
2159
        if (i->fd >= 0 && !reuse_fd)
21,248✔
2160
                close(i->fd);
18,140✔
2161
        assert(i->rbuf.slabc == &cord()->slabc);
21,248!
2162
        ibuf_destroy(&i->rbuf);
21,248✔
2163
        if (i->state == XLOG_CURSOR_TX)
21,246✔
2164
                xlog_tx_cursor_destroy(&i->tx_cursor);
1,158✔
2165
        ZSTD_freeDStream(i->zdctx);
21,246✔
2166
        i->state = (i->state == XLOG_CURSOR_EOF ?
42,492✔
2167
                    XLOG_CURSOR_EOF_CLOSED : XLOG_CURSOR_CLOSED);
21,246✔
2168
        /*
2169
         * Do not trash the cursor object since the caller might
2170
         * still want to access its state and/or meta information.
2171
         */
2172
}
21,246✔
2173

2174
/* }}} */
2175

2176
/* {{{ xlog_remove_file */
2177

2178
/** xlog_remove_file() coio task. */
2179
struct xlog_remove_file_task {
2180
        /** Base class. */
2181
        struct coio_task base;
2182
        /** Path to the file. */
2183
        char *filename;
2184
        /** Bitwise combination of XLOG_RM_* flags. */
2185
        unsigned flags;
2186
};
2187

2188
/** Default implementation of xlog_remove_file_impl(). */
2189
static int
2190
xlog_remove_file_impl_default(const char *filename, bool *existed)
10,023✔
2191
{
2192
        if (unlink(filename) != 0) {
10,023✔
2193
                if (errno != ENOENT) {
4,545!
2194
                        diag_set(SystemError, "failed to unlink file '%s'",
×
2195
                                 filename);
2196
                        return -1;
×
2197
                }
2198
                *existed = false;
4,545✔
2199
        } else {
2200
                *existed = true;
5,478✔
2201
        }
2202
        return 0;
10,023✔
2203
}
2204

2205
xlog_remove_file_impl_f xlog_remove_file_impl = xlog_remove_file_impl_default;
2206

2207
/** Blocking implementation of xlog_remove_file(). */
2208
static bool
2209
xlog_remove_file_blocking(const char *filename, unsigned flags)
10,022✔
2210
{
2211
        bool existed;
2212
        if (xlog_remove_file_impl(filename, &existed) != 0) {
10,022!
2213
                diag_log();
×
2214
                diag_clear(diag_get());
×
2215
                say_error("error while removing %s", filename);
×
2216
                return false;
10,023✔
2217
        }
2218
        if (existed && (flags & XLOG_RM_VERBOSE) != 0)
10,023✔
2219
                say_info("removed %s", filename);
5,444!
2220
        return true;
10,023✔
2221
}
2222

2223
static int
2224
xlog_remove_file_cb(struct coio_task *base)
2,486✔
2225
{
2226
        struct xlog_remove_file_task *task =
2,486✔
2227
                (struct xlog_remove_file_task *)base;
2228
        xlog_remove_file_blocking(task->filename, task->flags);
2,486✔
2229
        return 0;
2,488✔
2230
}
2231

2232
static int
2233
xlog_remove_file_done_cb(struct coio_task *base)
2,489✔
2234
{
2235
        struct xlog_remove_file_task *task =
2,489✔
2236
                (struct xlog_remove_file_task *)base;
2237
        TRASH(task);
2,489✔
2238
        free(task);
2,489✔
2239
        return 0;
2,489✔
2240
}
2241

2242
/** Asynchronous implementation of xlog_remove_file(). */
2243
static bool
2244
xlog_remove_file_async(const char *filename, unsigned flags)
2,489✔
2245
{
2246
        struct xlog_remove_file_task *task;
2247
        struct grp_alloc all = grp_alloc_initializer();
2,489!
2248
        grp_alloc_reserve_data(&all, sizeof(*task));
2,489!
2249
        grp_alloc_reserve_str0(&all, filename);
2,489!
2250
        grp_alloc_use(&all, xmalloc(grp_alloc_size(&all)));
2,489!
2251
        task = grp_alloc_create_data(&all, sizeof(*task));
2,489!
2252
        task->filename = grp_alloc_create_str0(&all, filename);
2,489!
2253
        task->flags = flags;
2,489✔
2254
        assert(grp_alloc_size(&all) == 0);
2,489!
2255
        coio_task_create(&task->base, xlog_remove_file_cb,
2,489!
2256
                         xlog_remove_file_done_cb);
2257
        coio_task_post(&task->base);
2,489!
2258
        return true;
2,489✔
2259
}
2260

2261
bool
2262
xlog_remove_file(const char *filename, unsigned flags)
10,024✔
2263
{
2264
        if (flags & XLOG_RM_ASYNC)
10,024✔
2265
                return xlog_remove_file_async(filename, flags);
2,489✔
2266
        else
2267
                return xlog_remove_file_blocking(filename, flags);
7,535✔
2268
}
2269

2270
/* }}} */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc