• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

FluentDo / agent / 20435534704

22 Dec 2025 02:59PM UTC coverage: 56.273%. Remained the same
20435534704

Pull #170

github

web-flow
Merge 383e4038c into dedf52ee6
Pull Request #170: fix: resolve missing dependencies for Windows Kafka SASL support

17769 of 33640 branches covered (52.82%)

Branch coverage included in aggregate %.

86712 of 152029 relevant lines covered (57.04%)

5781.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.1
source/src/flb_input_chunk.c
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2

3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19

20
#define FS_CHUNK_SIZE_DEBUG(op)  {flb_trace("[%d] %s -> fs_chunks_size = %zu", \
21
        __LINE__, op->name, op->fs_chunks_size);}
22
#define FS_CHUNK_SIZE_DEBUG_MOD(op, chunk, mod)  {flb_trace( \
23
        "[%d] %s -> fs_chunks_size = %zu mod=%zd chunk=%s", __LINE__, \
24
        op->name, op->fs_chunks_size, mod, flb_input_chunk_get_name(chunk));}
25

26
#include <fluent-bit/flb_info.h>
27
#include <fluent-bit/flb_config.h>
28
#include <fluent-bit/flb_input.h>
29
#include <fluent-bit/flb_input_chunk.h>
30
#include <fluent-bit/flb_input_plugin.h>
31
#include <fluent-bit/flb_storage.h>
32
#include <fluent-bit/flb_time.h>
33
#include <fluent-bit/flb_lib.h>
34
#include <fluent-bit/flb_router.h>
35
#include <fluent-bit/flb_task.h>
36
#include <fluent-bit/flb_routes_mask.h>
37
#include <fluent-bit/flb_metrics.h>
38
#include <fluent-bit/stream_processor/flb_sp.h>
39
#include <fluent-bit/flb_ring_buffer.h>
40
#include <chunkio/chunkio.h>
41
#include <monkey/mk_core.h>
42

43

44

45

46
#define BLOCK_UNTIL_KEYPRESS() {char temp_keypress_buffer; read(0, &temp_keypress_buffer, 1);}
47

48
#define FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL  0
49
#define FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL 1
50

51
struct input_chunk_raw {
52
    struct flb_input_instance *ins;
53
    int event_type;
54
    size_t records;
55
    flb_sds_t tag;
56
    void *buf_data;
57
    size_t buf_size;
58
};
59

60
#ifdef FLB_HAVE_IN_STORAGE_BACKLOG
61

62
extern ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
63
                                                    size_t                      required_space);
64

65
extern int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
66
                                         ssize_t                    *required_space);
67

68

69
#else
70

71
ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
72
                                             size_t                      required_space)
73
{
74
    return 0;
75
}
76

77
int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
78
                                  ssize_t                    *required_space)
79
{
80
    return 0;
81
}
82

83
#endif
84

85
static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
86
                                       struct flb_input_chunk *old_ic,
87
                                       uint64_t o_id);
88

89
static int flb_input_chunk_is_task_safe_delete(struct flb_task *task);
90

91
static int flb_input_chunk_drop_task_route(
92
                struct flb_task *task,
93
                struct flb_output_instance *o_ins,
94
                ssize_t *dropped_record_count);
95

96
static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic);
97

98
static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk)
4✔
99
{
100
    ssize_t record_count;
4✔
101
    char   *chunk_buffer;
4✔
102
    size_t  chunk_size;
4✔
103
    int     set_down;
4✔
104
    int     ret;
4✔
105

106
    ret = cio_chunk_is_up(input_chunk->chunk);
4✔
107
    set_down = FLB_FALSE;
4✔
108

109
    if (ret == CIO_FALSE) {
4✔
110
        ret = cio_chunk_up_force(input_chunk->chunk);
4✔
111

112
        if (ret == -1) {
4✔
113
            return -1;
114
        }
115

116
        set_down = FLB_TRUE;
117
    }
118

119
    ret = cio_chunk_get_content(input_chunk->chunk,
4✔
120
                                &chunk_buffer,
121
                                &chunk_size);
122

123
    if (ret == CIO_OK) {
4✔
124
        record_count = flb_mp_count(chunk_buffer, chunk_size);
4✔
125
    }
126
    else {
127
        record_count = -1;
128
    }
129

130
    if (set_down) {
4✔
131
        cio_chunk_down(input_chunk->chunk);
4✔
132
    }
133

134
    return record_count;
135
}
136

137
static int flb_input_chunk_release_space(
23✔
138
                    struct flb_input_chunk     *new_input_chunk,
139
                    struct flb_input_instance  *input_plugin,
140
                    struct flb_output_instance *output_plugin,
141
                    ssize_t                    *required_space,
142
                    int                         release_scope)
143
{
144
    struct mk_list         *input_chunk_iterator_tmp;
23✔
145
    struct mk_list         *input_chunk_iterator;
23✔
146
    ssize_t                 dropped_record_count;
23✔
147
    int                     chunk_destroy_flag;
23✔
148
    struct flb_input_chunk *old_input_chunk;
23✔
149
    ssize_t                 released_space;
23✔
150
    int                     chunk_released;
23✔
151
    ssize_t                 chunk_size;
23✔
152

153
    released_space = 0;
23✔
154

155
    mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp,
28✔
156
                         &input_plugin->chunks) {
157
        old_input_chunk = mk_list_entry(input_chunk_iterator,
9✔
158
                                             struct flb_input_chunk, _head);
159

160
        if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask,
9✔
161
                                     output_plugin->id,
162
                                     input_plugin->config)) {
163
            continue;
×
164
        }
165

166
        if (flb_input_chunk_safe_delete(new_input_chunk,
9✔
167
                                        old_input_chunk,
168
                                        output_plugin->id) == FLB_FALSE) {
9✔
169
            continue;
5✔
170
        }
171

172
        if (flb_input_chunk_drop_task_route(old_input_chunk->task,
4✔
173
                                            output_plugin,
174
                                            &dropped_record_count) == FLB_FALSE) {
175
            continue;
×
176
        }
177

178
        chunk_size = flb_input_chunk_get_real_size(old_input_chunk);
4✔
179
        chunk_released = FLB_FALSE;
4✔
180
        chunk_destroy_flag = FLB_FALSE;
4✔
181

182
        if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) {
4✔
183
            flb_routes_mask_clear_bit(old_input_chunk->routes_mask,
4✔
184
                                      output_plugin->id,
185
                                      input_plugin->config);
186

187
            FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size);
4✔
188
            output_plugin->fs_chunks_size -= chunk_size;
4✔
189

190
            chunk_destroy_flag = flb_routes_mask_is_empty(
4✔
191
                                                old_input_chunk->routes_mask,
192
                                                input_plugin->config);
193

194
            chunk_released = FLB_TRUE;
4✔
195
        }
196
        else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) {
×
197
            chunk_destroy_flag = FLB_TRUE;
×
198
        }
199

200
#ifdef FLB_HAVE_METRICS
201
        if (dropped_record_count == 0) {
4✔
202
            dropped_record_count = get_input_chunk_record_count(old_input_chunk);
4✔
203

204
            if (dropped_record_count == -1) {
4✔
205
                flb_debug("[task] error getting chunk record count : %s",
×
206
                          old_input_chunk->in->name);
207
            }
208
            else {
209
                cmt_counter_add(output_plugin->cmt_dropped_records,
4✔
210
                                cfl_time_now(),
211
                                dropped_record_count,
212
                                1, (char *[]) {(char *) flb_output_name(output_plugin)});
4✔
213

214
                flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS,
4✔
215
                                dropped_record_count,
216
                                output_plugin->metrics);
217
            }
218
        }
219
#endif
220

221
        if (chunk_destroy_flag) {
4✔
222
            if (old_input_chunk->task != NULL) {
4✔
223
                /*
224
                 * If the chunk is referenced by a task and task has no active route,
225
                 * we need to destroy the task as well.
226
                 */
227
                if (old_input_chunk->task->users == 0) {
4✔
228
                    flb_debug("[task] drop task_id %d with no active route from input plugin %s",
4✔
229
                              old_input_chunk->task->id, new_input_chunk->in->name);
230
                    flb_task_destroy(old_input_chunk->task, FLB_TRUE);
4✔
231

232
                    chunk_released = FLB_TRUE;
4✔
233
                }
234
            }
235
            else {
236
                flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s",
×
237
                          flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name);
238

239
                flb_input_chunk_destroy(old_input_chunk, FLB_TRUE);
×
240

241
                chunk_released = FLB_TRUE;
×
242
            }
243
        }
244

245
        if (chunk_released) {
4✔
246
            released_space += chunk_size;
4✔
247
        }
248

249
        if (released_space >= *required_space) {
4✔
250
            break;
251
        }
252
    }
253

254
    *required_space -= released_space;
23✔
255

256
    return 0;
23✔
257
}
258

259
static void generate_chunk_name(struct flb_input_instance *in,
975✔
260
                                char *out_buf, int buf_size)
261
{
262
    struct flb_time tm;
975✔
263
    (void) in;
975✔
264

265
    flb_time_get(&tm);
975✔
266
    snprintf(out_buf, buf_size - 1,
975✔
267
             "%i-%lu.%4lu.flb",
268
             getpid(),
269
             tm.tm.tv_sec, tm.tm.tv_nsec);
270
}
975✔
271

272
ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic)
2,274✔
273
{
274
    return cio_chunk_get_content_size(ic->chunk);
2,274✔
275
}
276

277
/*
278
 * When chunk is set to DOWN from memory, data_size is set to 0 and
279
 * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size
280
 * is used to track the size of chunks in filesystem so we need to call
281
 * cio_chunk_get_real_size to return the original size in the file system
282
 */
283
static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic)
2,963✔
284
{
285
    ssize_t meta_size;
2,963✔
286
    ssize_t size;
2,963✔
287

288
    size = cio_chunk_get_real_size(ic->chunk);
2,963✔
289

290
    if (size != 0) {
2,963✔
291
        return size;
292
    }
293

294
    // Real size is not synced to chunk yet
295
    size = flb_input_chunk_get_size(ic);
×
296
    if (size == 0) {
×
297
        flb_debug("[input chunk] no data in the chunk %s",
×
298
                  flb_input_chunk_get_name(ic));
299
        return -1;
×
300
    }
301

302
    meta_size = cio_meta_size(ic->chunk);
×
303
    size += meta_size
×
304
        /* See https://github.com/edsiper/chunkio#file-layout for more details */
305
         + 2    /* HEADER BYTES */
306
         + 4    /* CRC32 */
307
         + 16   /* PADDING */
308
         + 2;   /* METADATA LENGTH BYTES */
×
309

310
    return size;
×
311
}
312

313
int flb_input_chunk_write(void *data, const char *buf, size_t len)
1,019✔
314
{
315
    int ret;
1,019✔
316
    struct flb_input_chunk *ic;
1,019✔
317

318
    ic = (struct flb_input_chunk *) data;
1,019✔
319

320
    ret = cio_chunk_write(ic->chunk, buf, len);
1,019✔
321
    return ret;
1,019✔
322
}
323

324
int flb_input_chunk_write_at(void *data, off_t offset,
×
325
                             const char *buf, size_t len)
326
{
327
    int ret;
×
328
    struct flb_input_chunk *ic;
×
329

330
    ic = (struct flb_input_chunk *) data;
×
331

332
    ret = cio_chunk_write_at(ic->chunk, offset, buf, len);
×
333
    return ret;
×
334
}
335

336
static int flb_input_chunk_drop_task_route(
4✔
337
            struct flb_task *task,
338
            struct flb_output_instance *output_plugin,
339
            ssize_t *dropped_record_count)
340
{
341
    int route_status;
4✔
342
    int result;
4✔
343

344
    *dropped_record_count = 0;
4✔
345

346
    if (task == NULL) {
4✔
347
        return FLB_TRUE;
348
    }
349

350
    result = FLB_TRUE;
4✔
351

352
    if (task->users != 0) {
4✔
353
        result = FLB_FALSE;
×
354

355
        if (output_plugin != NULL) {
×
356
            flb_task_acquire_lock(task);
×
357

358
            route_status = flb_task_get_route_status(task, output_plugin);
×
359

360
            if (route_status == FLB_TASK_ROUTE_INACTIVE) {
×
361
                flb_task_set_route_status(task,
×
362
                                          output_plugin,
363
                                          FLB_TASK_ROUTE_DROPPED);
364

365
                *dropped_record_count = (ssize_t) task->records;
×
366

367
                result = FLB_TRUE;
×
368
            }
369

370
            flb_task_release_lock(task);
×
371
        }
372
    }
373

374
    return result;
375
}
376

377

378
/*
379
 * For input_chunk referenced by an outgoing task, we need to check
380
 * whether the chunk is in the middle of output flush callback
381
 */
382
static int flb_input_chunk_is_task_safe_delete(struct flb_task *task)
×
383
{
384
    if (!task) {
×
385
        return FLB_TRUE;
386
    }
387

388
    if (task->users != 0) {
×
389
        return FLB_FALSE;
×
390
    }
391

392
    return FLB_TRUE;
393
}
394

395
static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
9✔
396
                                       struct flb_input_chunk *old_ic,
397
                                       uint64_t o_id)
398
{
399
    /* The chunk we want to drop should not be the incoming chunk */
400
    if (ic == old_ic) {
9✔
401
        return FLB_FALSE;
402
    }
403

404
    /*
405
     * Even if chunks from same input plugin have same routes_mask when created,
406
     * the routes_mask could be modified when new chunks is ingested. Therefore,
407
     * we still need to do the validation on the routes_mask with o_id.
408
     */
409
    if (flb_routes_mask_get_bit(old_ic->routes_mask,
4✔
410
                                o_id,
411
                                ic->in->config) == 0) {
4✔
412
        return FLB_FALSE;
413
    }
414

415
    return FLB_TRUE;
416
}
417

418
int flb_input_chunk_release_space_compound(
9✔
419
                        struct flb_input_chunk *new_input_chunk,
420
                        struct flb_output_instance *output_plugin,
421
                        size_t *local_release_requirement,
422
                        int release_local_space)
423
{
424
    ssize_t                    required_space_remainder;
9✔
425
    struct flb_input_instance *storage_backlog_instance;
9✔
426
    struct flb_input_instance *input_plugin_instance;
9✔
427
    struct mk_list            *iterator;
9✔
428
    int                        result;
9✔
429

430
    storage_backlog_instance = output_plugin->config->storage_input_plugin;
9✔
431

432
    *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk);
9✔
433
    required_space_remainder = (ssize_t) *local_release_requirement;
9✔
434

435
    if (required_space_remainder > 0) {
9✔
436
        result = flb_input_chunk_release_space(new_input_chunk,
9✔
437
                                               storage_backlog_instance,
438
                                               output_plugin,
439
                                               &required_space_remainder,
440
                                               FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL);
441
    }
442

443
    if (required_space_remainder > 0) {
9✔
444
        result = sb_release_output_queue_space(output_plugin,
9✔
445
                                               &required_space_remainder);
446
    }
447

448
    if (release_local_space) {
9✔
449
        if (required_space_remainder > 0) {
9✔
450
            result = flb_input_chunk_release_space(new_input_chunk,
9✔
451
                                                   new_input_chunk->in,
452
                                                   output_plugin,
453
                                                   &required_space_remainder,
454
                                                   FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL);
455
        }
456
    }
457

458
    if (required_space_remainder > 0) {
9✔
459
        mk_list_foreach(iterator, &output_plugin->config->inputs) {
15✔
460
            input_plugin_instance = \
10✔
461
                mk_list_entry(iterator, struct flb_input_instance, _head);
10✔
462

463
            if (input_plugin_instance != new_input_chunk->in) {
10✔
464
                result = flb_input_chunk_release_space(
5✔
465
                            new_input_chunk,
466
                            input_plugin_instance,
467
                            output_plugin,
468
                            &required_space_remainder,
469
                            FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL);
470
            }
471

472
            if (required_space_remainder <= 0) {
10✔
473
                break;
474
            }
475
        }
476
    }
477

478
    if (required_space_remainder < 0) {
9✔
479
        required_space_remainder = 0;
×
480
    }
481

482
    *local_release_requirement = (size_t) required_space_remainder;
9✔
483

484
    (void) result;
9✔
485

486
    return 0;
9✔
487
}
488

489
/*
490
 * Find a slot in the output instance to append the new data with size chunk_size, it
491
 * will drop the the oldest chunks when the limitation on local disk is reached.
492
 */
493
int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic,
9✔
494
                                        size_t chunk_size, int overlimit)
495
{
496
    int count;
9✔
497
    int result;
9✔
498
    struct mk_list *head;
9✔
499
    struct flb_output_instance *o_ins;
9✔
500
    size_t local_release_requirement;
9✔
501

502
    /*
503
     * For each output instances that will be over the limit after adding the new chunk,
504
     * we have to determine how many chunks needs to be removed. We will adjust the
505
     * routes_mask to only route to the output plugin that have enough space after
506
     * deleting some chunks fome the queue.
507
     */
508
    count = 0;
9✔
509

510
    mk_list_foreach(head, &ic->in->config->outputs) {
18✔
511
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
9✔
512

513
        if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 ||
18✔
514
           (flb_routes_mask_get_bit(ic->routes_mask,
9✔
515
                                    o_ins->id,
516
                                    o_ins->config) == 0)) {
517
            continue;
×
518
        }
519

520
        local_release_requirement = 0;
9✔
521

522
        result = flb_input_chunk_release_space_compound(
9✔
523
                                            ic, o_ins,
524
                                            &local_release_requirement,
525
                                            FLB_TRUE);
526

527
        if (result != 0 ||
9✔
528
            local_release_requirement != 0) {
9✔
529
            count++;
5✔
530
        }
531
    }
532

533
    if (count != 0) {
9✔
534
        flb_error("[input chunk] fail to drop enough chunks in order to place "
5✔
535
                  "new data coming from input plugin %s", flb_input_name(ic->in));
536
    }
537

538
    return count;
9✔
539
}
540

541
/*
542
 * Returns a non-zero result if any output instances will reach the limit
543
 * after buffering the new data
544
 */
545
int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic,
13✔
546
                                         size_t chunk_size)
547
{
548
    int overlimit = 0;
13✔
549
    struct mk_list *head;
13✔
550
    struct flb_output_instance *o_ins;
13✔
551

552
    mk_list_foreach(head, &ic->in->config->outputs) {
26✔
553
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
13✔
554

555
        if ((o_ins->total_limit_size == -1) ||
26✔
556
            (flb_routes_mask_get_bit(ic->routes_mask,
13✔
557
                                     o_ins->id,
558
                                     o_ins->config) == 0)) {
559
            continue;
×
560
        }
561

562
        FS_CHUNK_SIZE_DEBUG(o_ins);
13✔
563
        flb_trace("[input chunk] chunk %s required %ld bytes and %ld bytes left "
13✔
564
                  "in plugin %s", flb_input_chunk_get_name(ic), chunk_size,
565
                  o_ins->total_limit_size -
566
                  o_ins->fs_backlog_chunks_size -
567
                  o_ins->fs_chunks_size,
568
                  o_ins->name);
569

570
        if ((o_ins->fs_chunks_size +
13✔
571
             o_ins->fs_backlog_chunks_size +
13✔
572
             chunk_size) > o_ins->total_limit_size) {
13✔
573
            overlimit |= (1 << o_ins->id);
9✔
574
        }
575
    }
576

577
    return overlimit;
13✔
578
}
579

580
/* Find a slot for the incoming data to buffer it in local file system
581
 * returns 0 if none of the routes can be written to
582
 */
583
int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_size)
1,071✔
584
{
585
    int result;
1,071✔
586
        int overlimit;
1,071✔
587
    struct flb_input_instance *i_ins = ic->in;
1,071✔
588

589
    if (i_ins->storage_type == CIO_STORE_FS) {
1,071✔
590
        overlimit = flb_input_chunk_has_overlimit_routes(ic, chunk_size);
13✔
591
        if (overlimit != 0) {
13✔
592
            result = flb_input_chunk_find_space_new_data(ic, chunk_size, overlimit);
9✔
593

594
            if (result != 0) {
9✔
595
                return 0;
596
            }
597
        }
598
    }
599
    return !flb_routes_mask_is_empty(ic->routes_mask,
1,066✔
600
                                     i_ins->config);
601
}
602

603
/* Create an input chunk using a Chunk I/O */
604
struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
×
605
                                            int event_type,
606
                                            void *chunk)
607
{
608
    int records = 0;
×
609
    int tag_len;
×
610
    int has_routes;
×
611
    int ret;
×
612
    uint64_t ts;
×
613
    char *buf_data;
×
614
    size_t buf_size;
×
615
    size_t offset;
×
616
    ssize_t bytes;
×
617
    const char *tag_buf;
×
618
    struct flb_input_chunk *ic;
×
619

620
    /* Create context for the input instance */
621
    ic = flb_calloc(1, sizeof(struct flb_input_chunk));
×
622
    if (!ic) {
×
623
        flb_errno();
×
624
        return NULL;
×
625
    }
626
    ic->event_type = event_type;
×
627
    ic->busy = FLB_FALSE;
×
628
    ic->fs_counted = FLB_FALSE;
×
629
    ic->fs_backlog = FLB_TRUE;
×
630
    ic->chunk = chunk;
×
631
    ic->in = in;
×
632
    msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
×
633

634
    ret = cio_chunk_get_content(ic->chunk, &buf_data, &buf_size);
×
635
    if (ret != CIO_OK) {
×
636
        flb_error("[input chunk] error retrieving content for metrics");
×
637
        flb_free(ic);
×
638
        return NULL;
×
639
    }
640

641
    ic->routes_mask = (flb_route_mask_element *)
×
642
                            flb_calloc(in->config->route_mask_size,
×
643
                                       sizeof(flb_route_mask_element));
644

645
    if (ic->routes_mask == NULL) {
×
646
        flb_errno();
×
647
        cio_chunk_close(chunk, CIO_TRUE);
×
648
        flb_free(ic);
×
649
        return NULL;
×
650
    }
651

652
    if (ic->event_type == FLB_INPUT_LOGS) {
×
653
        /* Validate records in the chunk */
654
        ret = flb_mp_validate_log_chunk(buf_data, buf_size, &records, &offset);
×
655
        if (ret == -1) {
×
656
            /* If there are valid records, truncate the chunk size */
657
            if (records <= 0) {
×
658
                flb_plg_error(in,
×
659
                              "chunk validation failed, data might be corrupted. "
660
                              "No valid records found, the chunk will be discarded.");
661
                flb_free(ic->routes_mask);
×
662
                flb_free(ic);
×
663
                return NULL;
×
664
            }
665
            if (records > 0 && offset > 32) {
×
666
                flb_plg_warn(in,
×
667
                             "chunk validation failed, data might be corrupted. "
668
                             "Found %d valid records, failed content starts "
669
                             "right after byte %lu. Recovering valid records.",
670
                             records, offset);
671

672
                /* truncate the chunk to recover valid records */
673
                cio_chunk_write_at(chunk, offset, NULL, 0);
×
674
            }
675
            else {
676
                flb_plg_error(in,
×
677
                              "chunk validation failed, data might be corrupted. "
678
                              "Found %d valid records, failed content starts "
679
                              "right after byte %lu. Cannot recover chunk,",
680
                              records, offset);
681
                flb_free(ic->routes_mask);
×
682
                flb_free(ic);
×
683
                return NULL;
×
684
            }
685
        }
686
    }
687
    else if (ic->event_type == FLB_INPUT_METRICS) {
×
688
        ret = flb_mp_validate_metric_chunk(buf_data, buf_size, &records, &offset);
×
689
        if (ret == -1) {
×
690
            if (records <= 0) {
×
691
                flb_plg_error(in,
×
692
                              "metrics chunk validation failed, data might be corrupted. "
693
                              "No valid records found, the chunk will be discarded.");
694
                flb_free(ic->routes_mask);
×
695
                flb_free(ic);
×
696
                return NULL;
×
697
            }
698
            if (records > 0 && offset > 32) {
×
699
                flb_plg_warn(in,
×
700
                             "metrics chunk validation failed, data might be corrupted. "
701
                             "Found %d valid records, failed content starts "
702
                             "right after byte %lu. Recovering valid records.",
703
                             records, offset);
704

705
                /* truncate the chunk to recover valid records */
706
                cio_chunk_write_at(chunk, offset, NULL, 0);
×
707
            }
708
            else {
709
                flb_plg_error(in,
×
710
                              "metrics chunk validation failed, data might be corrupted. "
711
                              "Found %d valid records, failed content starts "
712
                              "right after byte %lu. Cannot recover chunk,",
713
                              records, offset);
714
                flb_free(ic->routes_mask);
×
715
                flb_free(ic);
×
716
                return NULL;
×
717
            }
718

719
        }
720
    }
721
    else if (ic->event_type == FLB_INPUT_TRACES) {
722

723
    }
×
724

725
    /* Skip chunks without content data */
726
    if (records == 0) {
×
727
        flb_plg_error(in,
×
728
                      "chunk validation failed, data might be corrupted. "
729
                      "No valid records found, the chunk will be discarded.");
730
        flb_free(ic->routes_mask);
×
731
        flb_free(ic);
×
732
        return NULL;
×
733
    }
734

735
    /*
736
     * If the content is valid and the chunk has extra padding zeros, just
737
     * perform an adjustment.
738
     */
739
    bytes = cio_chunk_get_content_size(chunk);
×
740
    if (bytes == -1) {
×
741
        flb_free(ic->routes_mask);
×
742
        flb_free(ic);
×
743
        return NULL;
×
744
    }
745
    if (offset < bytes) {
×
746
        cio_chunk_write_at(chunk, offset, NULL, 0);
×
747
    }
748

749
    /* Update metrics */
750
#ifdef FLB_HAVE_METRICS
751
    ic->total_records = records;
×
752
    if (ic->total_records > 0) {
×
753
        /* timestamp */
754
        ts = cfl_time_now();
×
755

756
        /* fluentbit_input_records_total */
757
        cmt_counter_add(in->cmt_records, ts, ic->total_records,
×
758
                        1, (char *[]) {(char *) flb_input_name(in)});
×
759

760
        /* fluentbit_input_bytes_total */
761
        cmt_counter_add(in->cmt_bytes, ts, buf_size,
×
762
                        1, (char *[]) {(char *) flb_input_name(in)});
×
763

764
        /* OLD metrics */
765
        flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics);
×
766
        flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
×
767
    }
768
#endif
769

770
    /* Get the the tag reference (chunk metadata) */
771
    ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
×
772
    if (ret == -1) {
×
773
        flb_error("[input chunk] error retrieving tag of input chunk");
×
774
        flb_free(ic->routes_mask);
×
775
        flb_free(ic);
×
776
        return NULL;
×
777
    }
778

779
    bytes = flb_input_chunk_get_real_size(ic);
×
780
    if (bytes < 0) {
×
781
        flb_warn("[input chunk] could not retrieve chunk real size");
×
782
        flb_free(ic->routes_mask);
×
783
        flb_free(ic);
×
784
        return NULL;
×
785
    }
786

787
    has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in);
×
788
    if (has_routes == 0) {
×
789
        flb_warn("[input chunk] no matching route for backoff log chunk %s",
×
790
                 flb_input_chunk_get_name(ic));
791
    }
792

793
    mk_list_add(&ic->_head, &in->chunks);
×
794

795
    flb_input_chunk_update_output_instances(ic, bytes);
×
796

797
    return ic;
×
798
}
799

800
static int input_chunk_write_header(struct cio_chunk *chunk, int event_type,
974✔
801
                                    char *tag, int tag_len)
802

803
{
804
    int ret;
974✔
805
    int meta_size;
974✔
806
    char *meta;
974✔
807

808
    /*
809
     * Prepare the Chunk metadata header
810
     * ----------------------------------
811
     * m[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0
812
     * m[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1
813
     * m[2] = type (FLB_INPUT_CHUNK_TYPE_LOG or FLB_INPUT_CHUNK_TYPE_METRIC or FLB_INPUT_CHUNK_TYPE_TRACE
814
     * m[3] = 0 (unused for now)
815
     */
816

817
    /* write metadata (tag) */
818
    if (tag_len > (65535 - FLB_INPUT_CHUNK_META_HEADER)) {
974✔
819
        /* truncate length */
820
        tag_len = 65535 - FLB_INPUT_CHUNK_META_HEADER;
821
    }
822
    meta_size = FLB_INPUT_CHUNK_META_HEADER + tag_len;
974✔
823

824
    /* Allocate buffer for metadata header */
825
    meta = flb_calloc(1, meta_size);
974✔
826
    if (!meta) {
974✔
827
        flb_errno();
×
828
        return -1;
×
829
    }
830

831
    /*
832
     * Write chunk header in a temporary buffer
833
     * ----------------------------------------
834
     */
835

836
    /* magic bytes */
837
    meta[0] = FLB_INPUT_CHUNK_MAGIC_BYTE_0;
974✔
838
    meta[1] = FLB_INPUT_CHUNK_MAGIC_BYTE_1;
974✔
839

840
    /* event type */
841
    if (event_type == FLB_INPUT_LOGS) {
974✔
842
        meta[2] = FLB_INPUT_CHUNK_TYPE_LOGS;
941✔
843
    }
844
    else if (event_type == FLB_INPUT_METRICS) {
33✔
845
        meta[2] = FLB_INPUT_CHUNK_TYPE_METRICS;
33✔
846
    }
847
    else if (event_type == FLB_INPUT_TRACES) {
×
848
        meta[2] = FLB_INPUT_CHUNK_TYPE_TRACES;
×
849
    }
850
    else if (event_type == FLB_INPUT_PROFILES) {
×
851
        meta[2] = FLB_INPUT_CHUNK_TYPE_PROFILES;
×
852
    }
853

854
    /* unused byte */
855
    meta[3] = 0;
974✔
856

857
    /* copy the tag after magic bytes */
858
    memcpy(meta + FLB_INPUT_CHUNK_META_HEADER, tag, tag_len);
974✔
859

860
    /* Write tag into metadata section */
861
    ret = cio_meta_write(chunk, (char *) meta, meta_size);
974✔
862
    if (ret == -1) {
974✔
863
        flb_error("[input chunk] could not write metadata");
×
864
        flb_free(meta);
×
865
        return -1;
×
866
    }
867
    flb_free(meta);
974✔
868

869
    return 0;
974✔
870
}
871

872
struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type,
975✔
873
                                               const char *tag, int tag_len)
874
{
875
    int ret;
975✔
876
    int err;
975✔
877
    int set_down = FLB_FALSE;
975✔
878
    int has_routes;
975✔
879
    char name[64];
975✔
880
    struct cio_chunk *chunk;
975✔
881
    struct flb_storage_input *storage;
975✔
882
    struct flb_input_chunk *ic;
975✔
883

884
    storage = in->storage;
975✔
885

886
    /* chunk name */
887
    generate_chunk_name(in, name, sizeof(name) - 1);
975✔
888

889
    /* open/create target chunk file */
890
    chunk = cio_chunk_open(storage->cio, storage->stream, name,
975✔
891
                           CIO_OPEN, FLB_INPUT_CHUNK_SIZE, &err);
892
    if (!chunk) {
975✔
893
        flb_error("[input chunk] could not create chunk file: %s:%s",
1✔
894
                  storage->stream->name, name);
895
        return NULL;
1✔
896
    }
897
    /*
898
     * If the returned chunk at open is 'down', just put it up, write the
899
     * content and set it down again.
900
     */
901
    ret = cio_chunk_is_up(chunk);
974✔
902
    if (ret == CIO_FALSE) {
974✔
903
        ret = cio_chunk_up_force(chunk);
×
904
        if (ret == -1) {
×
905
            cio_chunk_close(chunk, CIO_TRUE);
×
906
            return NULL;
×
907
        }
908
        set_down = FLB_TRUE;
909
    }
910

911
    /* Write chunk header */
912
    ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len);
974✔
913
    if (ret == -1) {
974✔
914
        cio_chunk_close(chunk, CIO_TRUE);
×
915
        return NULL;
×
916
    }
917

918
    /* Create context for the input instance */
919
    ic = flb_calloc(1, sizeof(struct flb_input_chunk));
974✔
920
    if (!ic) {
974✔
921
        flb_errno();
×
922
        cio_chunk_close(chunk, CIO_TRUE);
×
923
        return NULL;
×
924
    }
925

926
    /*
927
     * Check chunk content type to be created: depending of the value set by
928
     * the input plugin, this can be FLB_INPUT_LOGS, FLB_INPUT_METRICS or
929
     * FLB_INPUT_TRACES.
930
     */
931
    ic->event_type = event_type;
974✔
932
    ic->busy = FLB_FALSE;
974✔
933
    ic->fs_counted = FLB_FALSE;
974✔
934
    ic->chunk = chunk;
974✔
935
    ic->fs_backlog = FLB_FALSE;
974✔
936
    ic->in = in;
974✔
937
    ic->stream_off = 0;
974✔
938
    ic->task = NULL;
974✔
939
    ic->create_time = flb_time_now();
974✔
940
#ifdef FLB_HAVE_METRICS
941
    ic->total_records = 0;
974✔
942
#endif
943
    ic->routes_mask = (flb_route_mask_element *)
974✔
944
                            flb_calloc(in->config->route_mask_size,
974✔
945
                                       sizeof(flb_route_mask_element));
946

947
    if (ic->routes_mask == NULL) {
974✔
948
        flb_errno();
×
949
        cio_chunk_close(chunk, CIO_TRUE);
×
950
        flb_free(ic);
×
951
        return NULL;
×
952
    }
953

954

955
    /* Calculate the routes_mask for the input chunk */
956
    has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag, tag_len, in);
974✔
957
    if (has_routes == 0) {
974✔
958
        flb_trace("[input chunk] no matching route for input chunk '%s' with tag '%s'",
974✔
959
                  flb_input_chunk_get_name(ic), tag);
960
    }
961

962
    msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
974✔
963
    mk_list_add(&ic->_head, &in->chunks);
974✔
964

965
    if (set_down == FLB_TRUE) {
974✔
966
        cio_chunk_down(chunk);
×
967
    }
968

969
    if (event_type == FLB_INPUT_LOGS) {
974✔
970
        flb_hash_table_add(in->ht_log_chunks, tag, tag_len, ic, 0);
941✔
971
    }
972
    else if (event_type == FLB_INPUT_METRICS) {
33✔
973
        flb_hash_table_add(in->ht_metric_chunks, tag, tag_len, ic, 0);
33✔
974
    }
975
    else if (event_type == FLB_INPUT_TRACES) {
×
976
        flb_hash_table_add(in->ht_trace_chunks, tag, tag_len, ic, 0);
×
977
    }
978
    else if (event_type == FLB_INPUT_PROFILES) {
×
979
        flb_hash_table_add(in->ht_profile_chunks, tag, tag_len, ic, 0);
×
980
    }
981

982
    return ic;
983
}
984

985
int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic,
×
986
                                      const char *tag_buf, int tag_len,
987
                                      int del)
988
{
989
    ssize_t bytes;
×
990
    struct mk_list *head;
×
991
    struct flb_output_instance *o_ins;
×
992

993
    mk_list_foreach(head, &ic->in->config->outputs) {
×
994
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
×
995

996
        if (o_ins->total_limit_size == -1) {
×
997
            continue;
×
998
        }
999

1000
        bytes = flb_input_chunk_get_real_size(ic);
×
1001
        if (bytes == -1) {
×
1002
            // no data in the chunk
1003
            continue;
×
1004
        }
1005

1006
        if (flb_routes_mask_get_bit(ic->routes_mask,
×
1007
                                    o_ins->id,
1008
                                    o_ins->config) != 0) {
1009
            if (ic->fs_counted == FLB_TRUE) {
×
1010
                FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes);
×
1011
                o_ins->fs_chunks_size -= bytes;
×
1012
                flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, "
×
1013
                          "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
1014
                          bytes, o_ins->name, o_ins->fs_chunks_size);
1015
            }
1016
        }
1017
    }
1018

1019
    if (del == CIO_TRUE && tag_buf) {
×
1020
        /*
1021
         * "TRY" to delete any reference to this chunk ('ic') from the hash
1022
         * table. Note that maybe the value is not longer available in the
1023
         * entries if it was replaced: note that we always keep the last
1024
         * chunk for a specific Tag.
1025
         */
1026
        if (ic->event_type == FLB_INPUT_LOGS) {
×
1027
            flb_hash_table_del_ptr(ic->in->ht_log_chunks,
×
1028
                                   tag_buf, tag_len, (void *) ic);
1029
        }
1030
        else if (ic->event_type == FLB_INPUT_METRICS) {
×
1031
            flb_hash_table_del_ptr(ic->in->ht_metric_chunks,
×
1032
                                   tag_buf, tag_len, (void *) ic);
1033
        }
1034
        else if (ic->event_type == FLB_INPUT_TRACES) {
×
1035
            flb_hash_table_del_ptr(ic->in->ht_trace_chunks,
×
1036
                                   tag_buf, tag_len, (void *) ic);
1037
        }
1038
        else if (ic->event_type == FLB_INPUT_PROFILES) {
×
1039
            flb_hash_table_del_ptr(ic->in->ht_profile_chunks,
×
1040
                                   tag_buf, tag_len, (void *) ic);
1041
        }
1042
    }
1043

1044

1045
    cio_chunk_close(ic->chunk, del);
×
1046
    mk_list_del(&ic->_head);
×
1047

1048
    if (ic->routes_mask != NULL) {
×
1049
        flb_free(ic->routes_mask);
×
1050
        ic->routes_mask = NULL;
×
1051
    }
1052

1053
    flb_free(ic);
×
1054

1055
    return 0;
×
1056
}
1057

1058

1059
int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
974✔
1060
{
1061
    int tag_len;
974✔
1062
    int ret;
974✔
1063
    ssize_t bytes;
974✔
1064
    const char *tag_buf = NULL;
974✔
1065
    struct mk_list *head;
974✔
1066
    struct flb_output_instance *o_ins;
974✔
1067

1068
    if (flb_input_chunk_is_up(ic) == FLB_FALSE) {
974✔
1069
        flb_input_chunk_set_up(ic);
4✔
1070
    }
1071

1072
    mk_list_foreach(head, &ic->in->config->outputs) {
2,227✔
1073
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
1,253✔
1074

1075
        if (o_ins->total_limit_size == -1) {
1,253✔
1076
            continue;
1,246✔
1077
        }
1078

1079
        bytes = flb_input_chunk_get_real_size(ic);
7✔
1080
        if (bytes == -1) {
7✔
1081
            // no data in the chunk
1082
            continue;
×
1083
        }
1084

1085
        if (flb_routes_mask_get_bit(ic->routes_mask,
7✔
1086
                                    o_ins->id,
1087
                                    o_ins->config) != 0) {
1088
            if (ic->fs_counted == FLB_TRUE) {
3✔
1089
                FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes);
3✔
1090
                o_ins->fs_chunks_size -= bytes;
3✔
1091
                flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, "
3✔
1092
                          "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
1093
                          bytes, o_ins->name, o_ins->fs_chunks_size);
1094
            }
1095
        }
1096
    }
1097

1098
    /*
1099
     * When a chunk is going to be destroyed, this can be in a down state,
1100
     * since the next step is to retrieve the Tag we need to have the
1101
     * content up.
1102
     */
1103
    ret = flb_input_chunk_is_up(ic);
974✔
1104
    if (ret == FLB_FALSE) {
974✔
1105
        ret = cio_chunk_up_force(ic->chunk);
×
1106
        if (ret == -1) {
×
1107
            flb_error("[input chunk] cannot load chunk: %s",
×
1108
                      flb_input_chunk_get_name(ic));
1109
        }
1110
    }
1111

1112
    /* Retrieve Tag */
1113
    ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
974✔
1114
    if (ret == -1) {
974✔
1115
        flb_trace("[input chunk] could not retrieve chunk tag: %s",
974✔
1116
                  flb_input_chunk_get_name(ic));
1117
    }
1118

1119
    if (del == CIO_TRUE && tag_buf) {
974✔
1120
        /*
1121
         * "TRY" to delete any reference to this chunk ('ic') from the hash
1122
         * table. Note that maybe the value is not longer available in the
1123
         * entries if it was replaced: note that we always keep the last
1124
         * chunk for a specific Tag.
1125
         */
1126
        if (ic->event_type == FLB_INPUT_LOGS) {
972✔
1127
            flb_hash_table_del_ptr(ic->in->ht_log_chunks,
940✔
1128
                                   tag_buf, tag_len, (void *) ic);
1129
        }
1130
        else if (ic->event_type == FLB_INPUT_METRICS) {
32✔
1131
            flb_hash_table_del_ptr(ic->in->ht_metric_chunks,
32✔
1132
                                   tag_buf, tag_len, (void *) ic);
1133
        }
1134
        else if (ic->event_type == FLB_INPUT_TRACES) {
×
1135
            flb_hash_table_del_ptr(ic->in->ht_trace_chunks,
×
1136
                                   tag_buf, tag_len, (void *) ic);
1137
        }
1138
        else if (ic->event_type == FLB_INPUT_PROFILES) {
×
1139
            flb_hash_table_del_ptr(ic->in->ht_profile_chunks,
×
1140
                                   tag_buf, tag_len, (void *) ic);
1141
        }
1142
    }
1143

1144

1145
    cio_chunk_close(ic->chunk, del);
974✔
1146
    mk_list_del(&ic->_head);
974✔
1147

1148
    if (ic->routes_mask != NULL) {
974✔
1149
        flb_free(ic->routes_mask);
974✔
1150
        ic->routes_mask = NULL;
974✔
1151
    }
1152

1153
    flb_free(ic);
974✔
1154

1155
    return 0;
974✔
1156
}
1157

1158
/* Return or create an available chunk to write data */
1159
static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
1,143✔
1160
                                               int event_type,
1161
                                               const char *tag, int tag_len,
1162
                                               size_t chunk_size, int *set_down)
1163
{
1164
    int id = -1;
1,143✔
1165
    int ret;
1,143✔
1166
    int new_chunk = FLB_FALSE;
1,143✔
1167
    size_t out_size;
1,143✔
1168
    struct flb_input_chunk *ic = NULL;
1,143✔
1169

1170
    if (tag_len > FLB_INPUT_CHUNK_TAG_MAX) {
1,143✔
1171
        flb_plg_warn(in,
×
1172
                     "Tag set exceeds limit, truncating from %i to %i bytes",
1173
                     tag_len, FLB_INPUT_CHUNK_TAG_MAX);
1174
        tag_len = FLB_INPUT_CHUNK_TAG_MAX;
1175
    }
1176

1177
    if (event_type == FLB_INPUT_LOGS) {
1,143✔
1178
        id = flb_hash_table_get(in->ht_log_chunks, tag, tag_len,
1,086✔
1179
                                (void *) &ic, &out_size);
1180
    }
1181
    else if (event_type == FLB_INPUT_METRICS) {
57✔
1182
        id = flb_hash_table_get(in->ht_metric_chunks, tag, tag_len,
57✔
1183
                                (void *) &ic, &out_size);
1184
    }
1185
    else if (event_type == FLB_INPUT_TRACES) {
×
1186
        id = flb_hash_table_get(in->ht_trace_chunks, tag, tag_len,
×
1187
                                (void *) &ic, &out_size);
1188
    }
1189
    else if (event_type == FLB_INPUT_PROFILES) {
×
1190
        id = flb_hash_table_get(in->ht_profile_chunks, tag, tag_len,
×
1191
                                (void *) &ic, &out_size);
1192
    }
1193

1194
    if (id >= 0) {
1,143✔
1195
        if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) {
208✔
1196
            ic = NULL;
40✔
1197
        }
1198
        else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
168✔
1199
            ret = cio_chunk_up_force(ic->chunk);
×
1200

1201
            if (ret == CIO_CORRUPTED) {
×
1202
                if (in->config->storage_del_bad_chunks) {
×
1203
                    /* If the chunk is corrupted we need to discard it and
1204
                     * set ic to NULL so the system tries to allocate a new
1205
                     * chunk.
1206
                     */
1207

1208
                    flb_error("[input chunk] discarding corrupted chunk");
×
1209
                }
1210

1211
                flb_input_chunk_destroy_corrupted(ic,
×
1212
                                                  tag, tag_len,
1213
                                                  in->config->storage_del_bad_chunks);
×
1214

1215
                ic = NULL;
×
1216
            }
1217
            else if (ret != CIO_OK) {
×
1218
                ic = NULL;
×
1219
            }
1220

1221
            *set_down = FLB_TRUE;
×
1222
        }
1223
    }
1224

1225
    /* No chunk was found, we need to create a new one */
1226
    if (!ic) {
1,143✔
1227
        ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len);
975✔
1228
        new_chunk = FLB_TRUE;
975✔
1229
        if (!ic) {
975✔
1230
            return NULL;
1231
        }
1232
        ic->event_type = event_type;
974✔
1233
    }
1234

1235
    /*
1236
     * If buffering this block of data will exceed one of the limit among all output instances
1237
     * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks
1238
     * (based in creation time) to get enough space for the incoming chunk.
1239
     */
1240
    if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config)
1,142✔
1241
        && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) {
1,071✔
1242
        /*
1243
         * If the chunk is not newly created, the chunk might already have logs inside.
1244
         * We cannot delete (reused) chunks here.
1245
         * If the routes_mask is cleared after trying to append new data, we destroy
1246
         * the chunk.
1247
         */
1248
        if (new_chunk ||
10✔
1249
            flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) == FLB_TRUE) {
5✔
1250
            flb_input_chunk_destroy(ic, FLB_TRUE);
×
1251
        }
1252
        return NULL;
5✔
1253
    }
1254

1255
    return ic;
1,137✔
1256
}
1257

1258
static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i)
4,053✔
1259
{
1260
    if (i->mem_buf_limit <= 0) {
4,053✔
1261
        return FLB_FALSE;
1262
    }
1263

1264
    if (i->mem_chunks_size >= i->mem_buf_limit) {
292✔
1265
        return FLB_TRUE;
28✔
1266
    }
1267

1268
    return FLB_FALSE;
1269
}
1270

1271
static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i)
3,011✔
1272
{
1273
    struct flb_storage_input *storage = (struct flb_storage_input *)i->storage;
3,011✔
1274

1275
    if (storage->type == FLB_STORAGE_FS) {
3,011✔
1276
        if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) {
21✔
1277
            if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) {
×
1278
                return FLB_TRUE;
×
1279
            }
1280
        }
1281
    }
1282

1283
    return FLB_FALSE;
1284
}
1285

1286
/*
1287
 * Check all chunks associated to the input instance and summarize
1288
 * the number of bytes in use.
1289
 */
1290
size_t flb_input_chunk_total_size(struct flb_input_instance *in)
2,052✔
1291
{
1292
    size_t total = 0;
2,052✔
1293
    struct flb_storage_input *storage;
2,052✔
1294

1295
    storage = (struct flb_storage_input *) in->storage;
2,052✔
1296
    total = cio_stream_size_chunks_up(storage->stream);
2,052✔
1297
    return total;
2,052✔
1298
}
1299

1300
/*
1301
 * Count and update the number of bytes being used by the instance. Also
1302
 * check if the instance is paused, if so, check if it can be resumed if
1303
 * is not longer over the limits.
1304
 *
1305
 * It always returns the number of bytes in use.
1306
 */
1307
size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
1,991✔
1308
{
1309
    size_t total;
1,991✔
1310

1311
    /* Gather total number of enqueued bytes */
1312
    total = flb_input_chunk_total_size(in);
1,991✔
1313

1314
    /* Register the total into the context variable */
1315
    in->mem_chunks_size = total;
1,991✔
1316

1317
    /*
1318
     * After the adjustments, validate if the plugin is overlimit or paused
1319
     * and perform further adjustments.
1320
     */
1321
    if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE &&
1,991✔
1322
        in->config->is_running == FLB_TRUE &&
1,977✔
1323
        in->config->is_ingestion_active == FLB_TRUE &&
1,976✔
1324
        in->mem_buf_status == FLB_INPUT_PAUSED) {
1,908✔
1325
        in->mem_buf_status = FLB_INPUT_RUNNING;
14✔
1326
        if (in->p->cb_resume) {
14✔
1327
            flb_input_resume(in);
14✔
1328
            flb_info("[input] %s resume (mem buf overlimit - buf size %zuB now below limit %zuB)",
14✔
1329
                      flb_input_name(in),
1330
                      in->mem_chunks_size,
1331
                      in->mem_buf_limit);
1332
        }
1333
    }
1334
    if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE &&
1,991✔
1335
        in->config->is_running == FLB_TRUE &&
1,991✔
1336
        in->config->is_ingestion_active == FLB_TRUE &&
1,990✔
1337
        in->storage_buf_status == FLB_INPUT_PAUSED) {
1,922✔
1338
        in->storage_buf_status = FLB_INPUT_RUNNING;
×
1339
        if (in->p->cb_resume) {
×
1340
            flb_input_resume(in);
×
1341
            flb_info("[input] %s resume (storage buf overlimit %zu/%zu)",
×
1342
                      flb_input_name(in),
1343
                      ((struct flb_storage_input *)in->storage)->cio->total_chunks_up,
1344
                      ((struct flb_storage_input *)in->storage)->cio->max_chunks_up);
1345
        }
1346
    }
1347

1348
    return total;
1,991✔
1349
}
1350

1351
/*
1352
 * If the number of bytes in use by the chunks are over the imposed limit
1353
 * by configuration, pause the instance.
1354
 */
1355
static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t just_written_size)
1,020✔
1356
{
1357
    struct flb_storage_input *storage = i->storage;
1,020✔
1358

1359
    if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
1,020✔
1360
        flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
×
1361
                 flb_input_name(i),
1362
                 storage->cio->total_chunks_up,
1363
                 storage->cio->max_chunks_up);
1364
        flb_input_pause(i);
×
1365
        i->storage_buf_status = FLB_INPUT_PAUSED;
×
1366
        return FLB_TRUE;
×
1367
    }
1368

1369
    if (storage->type == FLB_STORAGE_FS) {
1,020✔
1370
        return FLB_FALSE;
1371
    }
1372

1373
    if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) {
1,012✔
1374
        /*
1375
         * if the plugin is already overlimit and the strategy is based on
1376
         * a memory-ring-buffer logic, do not pause the plugin, upon next
1377
         * try of ingestion 'memrb' will make sure to release some bytes.
1378
         */
1379
        if (i->storage_type == FLB_STORAGE_MEMRB) {
14✔
1380
            return FLB_FALSE;
1381
        }
1382

1383
        /*
1384
         * The plugin is using 'memory' buffering only and already reached
1385
         * it limit, just pause the ingestion.
1386
         */
1387
        flb_warn("[input] %s paused (mem buf overlimit - event of size %zuB exceeded limit %zu to %zuB)",
14✔
1388
                 flb_input_name(i),
1389
                 just_written_size,
1390
                 i->mem_buf_limit,
1391
                 i->mem_chunks_size
1392
                );
1393
        flb_input_pause(i);
14✔
1394
        i->mem_buf_status = FLB_INPUT_PAUSED;
14✔
1395
        return FLB_TRUE;
14✔
1396
    }
1397

1398
    return FLB_FALSE;
1399
}
1400

1401
/*
1402
 * Validate if the chunk coming from the input plugin based on config and
1403
 * resources usage must be 'up' or 'down' (applicable for filesystem storage
1404
 * type).
1405
 *
1406
 * FIXME: can we find a better name for this function ?
1407
 */
1408
int flb_input_chunk_set_up_down(struct flb_input_chunk *ic)
30✔
1409
{
1410
    size_t total;
30✔
1411
    struct flb_input_instance *in;
30✔
1412

1413
    in = ic->in;
30✔
1414

1415
    /* Gather total number of enqueued bytes */
1416
    total = flb_input_chunk_total_size(in);
30✔
1417

1418
    /* Register the total into the context variable */
1419
    in->mem_chunks_size = total;
30✔
1420

1421
    if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE) {
30✔
1422
        if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
×
1423
            cio_chunk_down(ic->chunk);
×
1424

1425
            /* Adjust new counters */
1426
            total = flb_input_chunk_total_size(ic->in);
×
1427
            in->mem_chunks_size = total;
×
1428

1429
            return FLB_FALSE;
×
1430
        }
1431
    }
1432

1433
    return FLB_TRUE;
1434
}
1435

1436
int flb_input_chunk_is_up(struct flb_input_chunk *ic)
3,085✔
1437
{
1438
    return cio_chunk_is_up(ic->chunk);
3,085✔
1439
}
1440

1441
int flb_input_chunk_down(struct flb_input_chunk *ic)
30✔
1442
{
1443
    if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
30✔
1444
        return cio_chunk_down(ic->chunk);
30✔
1445
    }
1446

1447
    return 0;
1448
}
1449

1450
int flb_input_chunk_set_up(struct flb_input_chunk *ic)
30✔
1451
{
1452
    if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
30✔
1453
        return cio_chunk_up(ic->chunk);
5✔
1454
    }
1455

1456
    return 0;
1457
}
1458

1459
static int memrb_input_chunk_release_space(struct flb_input_instance *ins,
×
1460
                                           size_t required_space,
1461
                                           size_t *dropped_chunks, size_t *dropped_bytes)
1462
{
1463
    int ret;
×
1464
    int released;
×
1465
    size_t removed_chunks = 0;
×
1466
    ssize_t chunk_size;
×
1467
    ssize_t released_space = 0;
×
1468
    struct mk_list *tmp;
×
1469
    struct mk_list *head;
×
1470
    struct flb_input_chunk *ic;
×
1471

1472
    mk_list_foreach_safe(head, tmp, &ins->chunks) {
×
1473
        ic = mk_list_entry(head, struct flb_input_chunk, _head);
×
1474

1475
        /* check if is there any task or no users associated */
1476
        ret = flb_input_chunk_is_task_safe_delete(ic->task);
×
1477
        if (ret == FLB_FALSE) {
×
1478
            continue;
×
1479
        }
1480

1481
        /* get chunk size */
1482
        chunk_size = flb_input_chunk_get_real_size(ic);
×
1483

1484
        released = FLB_FALSE;
×
1485
        if (ic->task != NULL) {
×
1486
            if (ic->task->users == 0) {
×
1487
                flb_task_destroy(ic->task, FLB_TRUE);
×
1488
                released = FLB_TRUE;
×
1489
            }
1490
        }
1491
        else {
1492
            flb_input_chunk_destroy(ic, FLB_TRUE);
×
1493
            released = FLB_TRUE;
×
1494
        }
1495

1496
        if (released) {
×
1497
            released_space += chunk_size;
×
1498
            removed_chunks++;
×
1499
        }
1500

1501
        if (released_space >= required_space) {
×
1502
            break;
1503
        }
1504
    }
1505

1506
    /* no matter if we succeeded or not, set the counters */
1507
    *dropped_bytes = released_space;
×
1508
    *dropped_chunks = removed_chunks;
×
1509

1510
    /* set the final status of the operation */
1511
    if (released_space >= required_space) {
×
1512
        return 0;
×
1513
    }
1514

1515
    return -1;
1516
}
1517

1518
/* Append a RAW MessagPack buffer to the input instance */
1519
static int input_chunk_append_raw(struct flb_input_instance *in,
1,143✔
1520
                                  int event_type,
1521
                                  size_t n_records,
1522
                                  const char *tag, size_t tag_len,
1523
                                  const void *buf, size_t buf_size)
1524
{
1525
    int ret, total_records_start;
1,143✔
1526
    int set_down = FLB_FALSE;
1,143✔
1527
    int min;
1,143✔
1528
    int new_chunk = FLB_FALSE;
1,143✔
1529
    uint64_t ts;
1,143✔
1530
    char *name;
1,143✔
1531
    size_t dropped_chunks;
1,143✔
1532
    size_t dropped_bytes;
1,143✔
1533
    size_t content_size;
1,143✔
1534
    size_t real_diff;
1,143✔
1535
    size_t real_size;
1,143✔
1536
    size_t pre_real_size;
1,143✔
1537
    struct flb_input_chunk *ic;
1,143✔
1538
    struct flb_storage_input *si;
1,143✔
1539
    void  *filtered_data_buffer;
1,143✔
1540
    size_t filtered_data_size;
1,143✔
1541
    void  *final_data_buffer;
1,143✔
1542
    size_t final_data_size;
1,143✔
1543

1544
    /* memory ring-buffer checker */
1545
    if (in->storage_type == FLB_STORAGE_MEMRB) {
1,143✔
1546
        /* check if we are overlimit */
1547
        ret = flb_input_chunk_is_mem_overlimit(in);
×
1548
        if (ret) {
×
1549
            /* reset counters */
1550
            dropped_chunks = 0;
×
1551
            dropped_bytes = 0;
×
1552

1553
            /* try to release 'buf_size' */
1554
            ret = memrb_input_chunk_release_space(in, buf_size,
×
1555
                                                  &dropped_chunks, &dropped_bytes);
1556

1557
            /* update metrics if required */
1558
            if (dropped_chunks > 0 || dropped_bytes > 0) {
×
1559
                /* timestamp and input plugin name for label */
1560
                ts = cfl_time_now();
×
1561
                name = (char *) flb_input_name(in);
×
1562

1563
                /* update counters */
1564
                cmt_counter_add(in->cmt_memrb_dropped_chunks, ts,
×
1565
                                dropped_chunks, 1, (char *[]) {name});
×
1566

1567
                cmt_counter_add(in->cmt_memrb_dropped_bytes, ts,
×
1568
                                dropped_bytes, 1, (char *[]) {name});
×
1569
            }
1570

1571
            if (ret != 0) {
×
1572
                /* we could not allocate the required space, just return */
1573
                return -1;
1574
            }
1575
        }
1576
    }
1577

1578
    /* Check if the input plugin has been paused */
1579
    if (flb_input_buf_paused(in) == FLB_TRUE) {
1,143✔
1580
        flb_debug("[input chunk] %s is paused, cannot append records",
×
1581
                  flb_input_name(in));
1582
        return -1;
×
1583
    }
1584

1585
    if (buf_size == 0) {
1,143✔
1586
        flb_debug("[input chunk] skip ingesting data with 0 bytes");
×
1587
        return -1;
×
1588
    }
1589

1590
    /*
1591
     * Some callers might not set a custom tag, on that case just inherit
1592
     * the fixed instance tag or instance name.
1593
     */
1594
    if (!tag) {
1,143✔
1595
        if (in->tag && in->tag_len > 0) {
798✔
1596
            tag = in->tag;
798✔
1597
            tag_len = in->tag_len;
798✔
1598
        }
1599
        else {
1600
            tag = in->name;
×
1601
            tag_len = strlen(in->name);
×
1602
        }
1603
    }
1604

1605
    /*
1606
     * Get a target input chunk, can be one with remaining space available
1607
     * or a new one.
1608
     */
1609
    ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down);
1,143✔
1610
    if (!ic) {
1,143✔
1611
        flb_error("[input chunk] no available chunk");
6✔
1612
        return -1;
6✔
1613
    }
1614

1615
    /* newly created chunk */
1616
    if (flb_input_chunk_get_size(ic) == 0) {
1,137✔
1617
        new_chunk = FLB_TRUE;
974✔
1618
    }
1619

1620
    /* We got the chunk, validate if is 'up' or 'down' */
1621
    ret = flb_input_chunk_is_up(ic);
1,137✔
1622
    if (ret == FLB_FALSE) {
1,137✔
1623
        ret = cio_chunk_up_force(ic->chunk);
×
1624
        if (ret == -1) {
×
1625
            flb_error("[input chunk] cannot retrieve temporary chunk");
×
1626
            return -1;
×
1627
        }
1628
        set_down = FLB_TRUE;
×
1629
    }
1630

1631
    /*
1632
     * Keep the previous real size to calculate the real size
1633
     * difference for flb_input_chunk_update_output_instances(),
1634
     * use 0 when the chunk is new since it's size will never
1635
     * have been calculated before.
1636
     */
1637
    if (new_chunk == FLB_TRUE) {
1,137✔
1638
        pre_real_size = 0;
1639
    }
1640
    else {
1641
        pre_real_size = flb_input_chunk_get_real_size(ic);
163✔
1642
    }
1643

1644
    /*
1645
     * Set the total_records based on the records that n_records
1646
     * says we should be writing. These values may be overwritten
1647
     * flb_filter_do, where a filter may add/remove records.
1648
     */
1649
    total_records_start = ic->total_records;
1,137✔
1650
    ic->added_records =  n_records;
1,137✔
1651
    ic->total_records += n_records;
1,137✔
1652

1653

1654
    /* Update 'input' metrics */
1655
#ifdef FLB_HAVE_METRICS
1656
    if (ic->total_records > 0) {
1,137✔
1657
        /* timestamp */
1658
        ts = cfl_time_now();
1,080✔
1659

1660
        /* fluentbit_input_records_total */
1661
        cmt_counter_add(in->cmt_records, ts, ic->added_records,
2,160✔
1662
                        1, (char *[]) {(char *) flb_input_name(in)});
1,080✔
1663

1664
        /* fluentbit_input_bytes_total */
1665
        cmt_counter_add(in->cmt_bytes, ts, buf_size,
2,160✔
1666
                        1, (char *[]) {(char *) flb_input_name(in)});
1,080✔
1667

1668
        /* OLD api */
1669
        flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
1,080✔
1670
        flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
1,080✔
1671
    }
1672
#endif
1673

1674
    filtered_data_buffer = NULL;
1,137✔
1675
    final_data_buffer = (char *) buf;
1,137✔
1676
    final_data_size = buf_size;
1,137✔
1677

1678
    /* Apply filters */
1679
    if (event_type == FLB_INPUT_LOGS) {
1,137✔
1680
        flb_filter_do(ic,
1,080✔
1681
                      buf, buf_size,
1682
                      &filtered_data_buffer,
1683
                      &filtered_data_size,
1684
                      tag, tag_len,
1685
                      in->config);
1686

1687
        final_data_buffer = filtered_data_buffer;
1,080✔
1688
        final_data_size = filtered_data_size;
1,080✔
1689
    }
1690

1691
    if (final_data_size > 0){
1,137✔
1692
        ret = flb_input_chunk_write(ic,
1,019✔
1693
                                    final_data_buffer,
1694
                                    final_data_size);
1695
    }
1696
    else {
1697
        ret = 0;
1698
    }
1699

1700
    if (filtered_data_buffer != NULL &&
1,137✔
1701
        filtered_data_buffer != buf) {
1702
        flb_free(filtered_data_buffer);
223✔
1703
    }
1704

1705
    /*
1706
     * If the write failed, then we did not add any records. Reset
1707
     * the record counters to reflect this.
1708
     */
1709
    if (ret != CIO_OK) {
1,137✔
1710
        ic->added_records = 0;
×
1711
        ic->total_records = total_records_start;
×
1712
    }
1713

1714
    if (ret == -1) {
×
1715
        flb_error("[input chunk] error writing data from %s instance",
×
1716
                  flb_input_name(in));
1717
        cio_chunk_tx_rollback(ic->chunk);
×
1718

1719
        return -1;
×
1720
    }
1721

1722
    /* get the chunks content size */
1723
    content_size = cio_chunk_get_content_size(ic->chunk);
1,137✔
1724

1725
    /*
1726
     * There is a case that rewrite_tag will modify the tag and keep rule is set
1727
     * to drop the original record. The original record will still go through the
1728
     * flb_input_chunk_update_output_instances(2) to update the fs_chunks_size by
1729
     * metadata bytes (consisted by metadata bytes of the file chunk). This condition
1730
     * sets the diff to 0 in order to not update the fs_chunks_size.
1731
     */
1732
    if (flb_input_chunk_get_size(ic) == 0) {
1,137✔
1733
        real_diff = 0;
1,137✔
1734
    }
1735

1736
    /* Lock buffers where size > 2MB */
1737
    if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
1,137✔
1738
        cio_chunk_lock(ic->chunk);
×
1739
    }
1740

1741
    /* Make sure the data was not filtered out and the buffer size is zero */
1742
    if (content_size == 0) {
1,137✔
1743
        flb_input_chunk_destroy(ic, FLB_TRUE);
117✔
1744
        flb_input_chunk_set_limits(in);
117✔
1745
        return 0;
117✔
1746
    }
1747
#ifdef FLB_HAVE_STREAM_PROCESSOR
1748
    else if (in->config->stream_processor_ctx &&
1749
             ic->event_type == FLB_INPUT_LOGS) {
1750
        char *c_data;
1751
        size_t c_size;
1752

1753
        /* Retrieve chunk (filtered) output content */
1754
        cio_chunk_get_content(ic->chunk, &c_data, &c_size);
1755

1756
        /* Invoke stream processor */
1757
        flb_sp_do(in->config->stream_processor_ctx,
1758
                  in,
1759
                  tag, tag_len,
1760
                  c_data + ic->stream_off, c_size - ic->stream_off);
1761
        ic->stream_off += (c_size - ic->stream_off);
1762
    }
1763
#endif
1764

1765
    if (set_down == FLB_TRUE) {
1,020✔
1766
        cio_chunk_down(ic->chunk);
×
1767
    }
1768

1769
    /*
1770
     * If the instance is not routable, there is no need to keep the
1771
     * content in the storage engine, just get rid of it.
1772
     */
1773
    if (in->routable == FLB_FALSE) {
1,020✔
1774
        flb_input_chunk_destroy(ic, FLB_TRUE);
×
1775
        return 0;
×
1776
    }
1777

1778
    /* Update memory counters and adjust limits if any */
1779
    flb_input_chunk_set_limits(in);
1,020✔
1780

1781
    /*
1782
     * Check if we are overlimit and validate if is there any filesystem
1783
     * storage type asociated to this input instance, if so, unload the
1784
     * chunk content from memory to respect imposed limits.
1785
     *
1786
     * Calling cio_chunk_down() the memory map associated and the file
1787
     * descriptor will be released. At any later time, it must be bring up
1788
     * for I/O operations.
1789
     */
1790
    si = (struct flb_storage_input *) in->storage;
1,020✔
1791
    if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE &&
1,020✔
1792
        si->type == FLB_STORAGE_FS) {
14✔
1793
        if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
×
1794
            /*
1795
             * If we are already over limit, a sub-sequent data ingestion
1796
             * might need a Chunk to write data in. As an optimization we
1797
             * will put this Chunk down ONLY IF it has less than 1% of
1798
             * it capacity as available space, otherwise keep it 'up' so
1799
             * it available space can be used.
1800
             */
1801
            content_size = cio_chunk_get_content_size(ic->chunk);
×
1802

1803
            /* Do we have less than 1% available ? */
1804
            min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01);
×
1805
            if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) {
×
1806
                cio_chunk_down(ic->chunk);
×
1807
            }
1808
        }
1809
    }
1810

1811
    real_size = flb_input_chunk_get_real_size(ic);
1,020✔
1812
    real_diff = real_size - pre_real_size;
1,020✔
1813
    if (real_diff != 0) {
1,020✔
1814
        flb_trace("[input chunk] update output instances with new chunk size diff=%zd, records=%zu, input=%s",
1,018✔
1815
                  real_diff, n_records, flb_input_name(in));
1816
        flb_input_chunk_update_output_instances(ic, real_diff);
1,018✔
1817
    }
1818

1819

1820
    flb_input_chunk_protect(in, final_data_size);
1,020✔
1821
    return 0;
1,020✔
1822
}
1823

1824
static void destroy_chunk_raw(struct input_chunk_raw *cr)
10✔
1825
{
1826
    if (cr->buf_data) {
10✔
1827
        flb_free(cr->buf_data);
10✔
1828
    }
1829

1830
    if (cr->tag) {
10✔
1831
        flb_sds_destroy(cr->tag);
6✔
1832
    }
1833

1834
    flb_free(cr);
10✔
1835
}
10✔
1836

1837
static int append_to_ring_buffer(struct flb_input_instance *ins,
10✔
1838
                                 int event_type,
1839
                                 size_t records,
1840
                                 const char *tag,
1841
                                 size_t tag_len,
1842
                                 const void *buf,
1843
                                 size_t buf_size)
1844

1845
{
1846
    int ret;
10✔
1847
    int retries = 0;
10✔
1848
    int retry_limit = 10;
10✔
1849
    struct input_chunk_raw *cr;
10✔
1850

1851
    if (buf_size == 0) {
10✔
1852
        flb_plg_debug(ins, "skip ingesting data with 0 bytes");
×
1853
        return -1;
×
1854
    }
1855

1856
    cr = flb_calloc(1, sizeof(struct input_chunk_raw));
10✔
1857
    if (!cr) {
10✔
1858
        flb_errno();
×
1859
        return -1;
×
1860
    }
1861
    cr->ins = ins;
10✔
1862
    cr->event_type = event_type;
10✔
1863

1864
    if (tag && tag_len > 0) {
10✔
1865
        cr->tag = flb_sds_create_len(tag, tag_len);
6✔
1866
        if (!cr->tag) {
6✔
1867
            flb_free(cr);
×
1868
            return -1;
×
1869
        }
1870
    }
1871
    else {
1872
        cr->tag = NULL;
4✔
1873
    }
1874

1875
    cr->records = records;
10✔
1876
    cr->buf_data = flb_malloc(buf_size);
10✔
1877
    if (!cr->buf_data) {
10✔
1878
        flb_errno();
×
1879
        destroy_chunk_raw(cr);
×
1880
        return -1;
×
1881
    }
1882

1883
    /*
1884
     * this memory copy is just a simple overhead, the problem we have is that
1885
     * input instances always assume that they have to release their buffer since
1886
     * the append raw operation already did a copy. Not a big issue but maybe this
1887
     * is a tradeoff...
1888
     */
1889
    memcpy(cr->buf_data, buf, buf_size);
10✔
1890
    cr->buf_size = buf_size;
10✔
1891

1892

1893

1894
retry:
10✔
1895
    /*
1896
     * There is a little chance that the ring buffer is full or due to saturation
1897
     * from the main thread the data is not being consumed. On this scenario we
1898
     * retry up to 'retry_limit' times with a little wait time.
1899
     */
1900
    if (retries >= retry_limit) {
10✔
1901
        flb_plg_error(ins, "could not enqueue records into the ring buffer");
×
1902
        destroy_chunk_raw(cr);
×
1903

1904
        /* update failed retries counter */
1905
        cmt_counter_add(ins->cmt_ring_buffer_retry_failures, cfl_time_now(),
×
1906
                        1, 1, (char *[]) {(char *) flb_input_name(ins)});
×
1907
        return -1;
×
1908
    }
1909

1910
    /* append chunk raw context to the ring buffer */
1911
    ret = flb_ring_buffer_write(ins->rb, (void *) &cr, sizeof(cr));
10✔
1912
    if (ret == -1) {
10✔
1913
        flb_plg_debug(ins, "failed buffer write, retries=%i\n",
×
1914
                      retries);
1915

1916
        /* if the ring buffer is full, we need to retry, update the counters */
1917
        cmt_counter_add(ins->cmt_ring_buffer_retries, cfl_time_now(),
×
1918
                        1, 1, (char *[]) {(char *) flb_input_name(ins)});
×
1919

1920

1921
        /* sleep for 100000 microseconds (100 milliseconds) */
1922
        usleep(100000);
×
1923
        retries++;
×
1924

1925
        goto retry;
×
1926
    }
1927

1928
    /* update successful writes */
1929
    cmt_counter_add(ins->cmt_ring_buffer_writes, cfl_time_now(),
30✔
1930
                    1, 1, (char *[]) {(char *) flb_input_name(ins)});
10✔
1931

1932
    return 0;
10✔
1933
}
1934

1935
/* iterate input instance ring buffer and remove any enqueued input_chunk_raw */
1936
void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins)
855✔
1937
{
1938
    int ret;
855✔
1939
    struct input_chunk_raw *cr;
855✔
1940

1941
    if (!ins->rb) {
855✔
1942
        return;
×
1943
    }
1944

1945
    while ((ret = flb_ring_buffer_read(ins->rb, (void *) &cr, sizeof(cr))) == 0) {
855✔
1946
        if (cr) {
×
1947
            destroy_chunk_raw(cr);
×
1948
            cr = NULL;
×
1949
        }
1950
    }
1951
}
1952

1953
void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data)
8,592✔
1954
{
1955
    int ret;
8,592✔
1956
    int tag_len = 0;
8,592✔
1957
    struct mk_list *head;
8,592✔
1958
    struct flb_input_instance *ins;
8,592✔
1959
    struct input_chunk_raw *cr;
8,592✔
1960

1961
    mk_list_foreach(head, &ctx->inputs) {
18,159✔
1962
        ins = mk_list_entry(head, struct flb_input_instance, _head);
9,567✔
1963
        cr = NULL;
9,567✔
1964

1965
        while (1) {
9,587✔
1966
            if (flb_input_buf_paused(ins) == FLB_TRUE) {
9,577✔
1967
                break;
1968
            }
1969

1970
            ret = flb_ring_buffer_read(ins->rb,
9,572✔
1971
                                       (void *) &cr,
1972
                                       sizeof(cr));
1973
            if (ret != 0) {
9,572✔
1974
                break;
1975
            }
1976

1977
            if (cr) {
10✔
1978
                if (cr->tag) {
10✔
1979
                    tag_len = flb_sds_len(cr->tag);
6✔
1980
                }
1981
                else {
1982
                    tag_len = 0;
1983
                }
1984

1985
                input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
10✔
1986
                                       cr->tag, tag_len,
1987
                                       cr->buf_data, cr->buf_size);
10✔
1988
                destroy_chunk_raw(cr);
10✔
1989
            }
1990
            cr = NULL;
10✔
1991
        }
1992

1993
        ins->rb->flush_pending = FLB_FALSE;
9,567✔
1994
    }
1995
}
8,592✔
1996

1997
int flb_input_chunk_append_raw(struct flb_input_instance *in,
1,143✔
1998
                               int event_type,
1999
                               size_t records,
2000
                               const char *tag, size_t tag_len,
2001
                               const void *buf, size_t buf_size)
2002
{
2003
    int ret;
1,143✔
2004

2005
    /*
2006
     * If the plugin instance registering the data runs in a separate thread, we must
2007
     * add the data reference to the ring buffer.
2008
     */
2009
    if (flb_input_is_threaded(in)) {
1,143✔
2010
        ret = append_to_ring_buffer(in, event_type, records,
10✔
2011
                                    tag, tag_len,
2012
                                    buf, buf_size);
2013
    }
2014
    else {
2015
        ret = input_chunk_append_raw(in, event_type, records,
1,133✔
2016
                                     tag, tag_len, buf, buf_size);
2017
    }
2018

2019
    return ret;
1,143✔
2020
}
2021

2022
/* Retrieve a raw buffer from a dyntag node */
2023
const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size)
880✔
2024
{
2025
    int ret;
880✔
2026
    size_t pre_size;
880✔
2027
    size_t post_size;
880✔
2028
    ssize_t diff_size;
880✔
2029
    char *buf = NULL;
880✔
2030

2031
    pre_size = flb_input_chunk_get_real_size(ic);
880✔
2032

2033
    if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
880✔
2034
        ret = cio_chunk_up(ic->chunk);
×
2035
        if (ret == -1) {
×
2036
            return NULL;
2037
        }
2038
    }
2039

2040
    /* Lock the internal chunk
2041
     *
2042
     * This operation has to be performed before getting the chunk data
2043
     * pointer because in certain situations it could cause the chunk
2044
     * mapping to be relocated (ie. macos / windows on trim)
2045
     */
2046
    cio_chunk_lock(ic->chunk);
880✔
2047

2048
    /*
2049
     * msgpack-c internal use a raw buffer for it operations, since we
2050
     * already appended data we just can take out the references to avoid
2051
     * a new memory allocation and skip a copy operation.
2052
     */
2053
    ret = cio_chunk_get_content(ic->chunk, &buf, size);
880✔
2054

2055
    if (ret == -1) {
880✔
2056
        flb_error("[input chunk] error retrieving chunk content");
×
2057
        return NULL;
×
2058
    }
2059

2060
    if (!buf) {
880✔
2061
        *size = 0;
×
2062
        return NULL;
×
2063
    }
2064

2065
    /* Set it busy as it likely it's a reference for an outgoing task */
2066
    ic->busy = FLB_TRUE;
880✔
2067

2068
    post_size = flb_input_chunk_get_real_size(ic);
880✔
2069
    if (post_size != pre_size) {
880✔
2070
        diff_size = post_size - pre_size;
×
2071
        flb_input_chunk_update_output_instances(ic, diff_size);
×
2072
    }
2073
    return buf;
880✔
2074
}
2075

2076
int flb_input_chunk_release_lock(struct flb_input_chunk *ic)
×
2077
{
2078
    if (ic->busy == FLB_FALSE) {
×
2079
        return -1;
2080
    }
2081

2082
    ic->busy = FLB_FALSE;
×
2083
    return 0;
×
2084
}
2085

2086
flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic)
53✔
2087
{
2088
    struct cio_chunk *ch;
53✔
2089

2090
    ch = (struct cio_chunk *) ic->chunk;
53✔
2091
    return ch->name;
53✔
2092
}
2093

2094
static inline int input_chunk_has_magic_bytes(char *buf, int len)
1,828✔
2095
{
2096
    unsigned char *p;
1,828✔
2097

2098
    if (len < FLB_INPUT_CHUNK_META_HEADER) {
1,828✔
2099
        return FLB_FALSE;
2100
    }
2101

2102
    p = (unsigned char *) buf;
1,828✔
2103
    if (p[0] == FLB_INPUT_CHUNK_MAGIC_BYTE_0 &&
1,828✔
2104
        p[1] == FLB_INPUT_CHUNK_MAGIC_BYTE_1 && p[3] == 0) {
1,828✔
2105
        return FLB_TRUE;
1,828✔
2106
    }
2107

2108
    return FLB_FALSE;
2109
}
2110

2111
/*
2112
 * Get the event type by retrieving metadata header. NOTE: this function only event type discovery by looking at the
2113
 * headers bytes of a chunk that exists on disk.
2114
 */
2115
int flb_input_chunk_get_event_type(struct flb_input_chunk *ic)
×
2116
{
2117
    int len;
×
2118
    int ret;
×
2119
    int type = -1;
×
2120
    char *buf = NULL;
×
2121

2122
    ret = cio_meta_read(ic->chunk, &buf, &len);
×
2123
    if (ret == -1) {
×
2124
        return -1;
2125
    }
2126

2127
    /* Check metadata header / magic bytes */
2128
    if (input_chunk_has_magic_bytes(buf, len)) {
×
2129
        if (buf[2] == FLB_INPUT_CHUNK_TYPE_LOGS) {
×
2130
            type = FLB_INPUT_LOGS;
2131
        }
2132
        else if (buf[2] == FLB_INPUT_CHUNK_TYPE_METRICS) {
2133
            type = FLB_INPUT_METRICS;
2134
        }
2135
        else if (buf[2] == FLB_INPUT_CHUNK_TYPE_TRACES) {
2136
            type = FLB_INPUT_TRACES;
2137
        }
2138
        else if (buf[2] == FLB_INPUT_CHUNK_TYPE_PROFILES) {
2139
            type = FLB_INPUT_PROFILES;
2140
        }
2141
        else if (buf[2] == FLB_INPUT_CHUNK_TYPE_BLOBS) {
2142
            type = FLB_INPUT_BLOBS;
×
2143
        }
2144
    }
2145
    else {
2146
        type = FLB_INPUT_LOGS;
2147
    }
2148

2149

2150
    return type;
2151
}
2152

2153
int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
1,828✔
2154
                            const char **tag_buf, int *tag_len)
2155
{
2156
    int len;
1,828✔
2157
    int ret;
1,828✔
2158
    char *buf;
1,828✔
2159

2160
    ret = cio_meta_read(ic->chunk, &buf, &len);
1,828✔
2161
    if (ret == -1) {
1,828✔
2162
        *tag_len = -1;
×
2163
        *tag_buf = NULL;
×
2164
        return -1;
×
2165
    }
2166

2167
    /* If magic bytes exists, just set the offset */
2168
    if (input_chunk_has_magic_bytes(buf, len)) {
1,828✔
2169
        *tag_len = len - FLB_INPUT_CHUNK_META_HEADER;
1,828✔
2170
        *tag_buf = buf + FLB_INPUT_CHUNK_META_HEADER;
1,828✔
2171
    }
2172
    else {
2173
        /* Old Chunk version without magic bytes */
2174
        *tag_len = len;
×
2175
        *tag_buf = buf;
×
2176
    }
2177

2178
    return ret;
2179
}
2180

2181
/*
2182
 * Iterates all output instances that the chunk will be flushing to and summarize
2183
 * the total number of bytes in use after ingesting the new data.
2184
 */
2185
void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
1,018✔
2186
                                             size_t chunk_size)
2187
{
2188
    struct mk_list *head;
1,018✔
2189
    struct flb_output_instance *o_ins;
1,018✔
2190

2191
    /* for each output plugin, we update the fs_chunks_size */
2192
    mk_list_foreach(head, &ic->in->config->outputs) {
2,317✔
2193
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
1,299✔
2194
        if (o_ins->total_limit_size == -1) {
1,299✔
2195
            continue;
1,292✔
2196
        }
2197

2198
        if (flb_routes_mask_get_bit(ic->routes_mask,
7✔
2199
                                    o_ins->id,
2200
                                    o_ins->config) != 0) {
2201
            /*
2202
             * if there is match on any index of 1's in the binary, it indicates
2203
             * that the input chunk will flush to this output instance
2204
             */
2205
            FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, chunk_size);
7✔
2206
            o_ins->fs_chunks_size += chunk_size;
7✔
2207
            ic->fs_counted = FLB_TRUE;
7✔
2208

2209
            flb_trace("[input chunk] chunk %s update plugin %s fs_chunks_size by %ld bytes, "
1,299✔
2210
                      "the current fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
2211
                      o_ins->name, chunk_size, o_ins->fs_chunks_size);
2212
        }
2213
    }
2214
}
1,018✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc