• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

FluentDo / agent / 19780819132

29 Nov 2025 07:27AM UTC coverage: 56.268% (-0.3%) from 56.596%
19780819132

Pull #123

github

web-flow
Merge 3e718788a into 0390f9ff6
Pull Request #123: feat: plugins: add new filter llm_tag plugin

17769 of 33640 branches covered (52.82%)

Branch coverage included in aggregate %.

86704 of 152029 relevant lines covered (57.03%)

5775.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.96
source/tests/runtime/in_forward.c
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2

3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2019-2022 The Fluent Bit Authors
6
 *  Copyright (C) 2015-2018 Treasure Data Inc.
7
 *
8
 *  Licensed under the Apache License, Version 2.0 (the "License");
9
 *  you may not use this file except in compliance with the License.
10
 *  You may obtain a copy of the License at
11
 *
12
 *      http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 *  Unless required by applicable law or agreed to in writing, software
15
 *  distributed under the License is distributed on an "AS IS" BASIS,
16
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 *  See the License for the specific language governing permissions and
18
 *  limitations under the License.
19
 */
20

21
#include <fluent-bit.h>
22
#include <fluent-bit/flb_compat.h>
23
#include <fluent-bit/flb_time.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_socket.h>
26
#include <fluent-bit/flb_gzip.h>
27
#include <fluent-bit/flb_zstd.h>
28
#include <fluent-bit/flb_pack.h>
29
#include <sys/types.h>
30
#include <sys/stat.h>
31
#ifdef FLB_HAVE_UNIX_SOCKET
32
#include <sys/socket.h>
33
#include <sys/un.h>
34
#endif
35
#include <fcntl.h>
36
#include "flb_tests_runtime.h"
37

38
struct test_ctx {
39
    flb_ctx_t *flb;    /* Fluent Bit library context */
40
    int i_ffd;         /* Input fd  */
41
    int f_ffd;         /* Filter fd (unused) */
42
    int o_ffd;         /* Output fd */
43
};
44

45

46
pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
47
int num_output = 0;
48
static int get_output_num()
16✔
49
{
50
    int ret;
16✔
51
    pthread_mutex_lock(&result_mutex);
16✔
52
    ret = num_output;
16✔
53
    pthread_mutex_unlock(&result_mutex);
16✔
54

55
    return ret;
16✔
56
}
57

58
static void set_output_num(int num)
16✔
59
{
60
    pthread_mutex_lock(&result_mutex);
16✔
61
    num_output = num;
16✔
62
    pthread_mutex_unlock(&result_mutex);
16✔
63
}
16✔
64

65
static void clear_output_num()
8✔
66
{
67
    set_output_num(0);
8✔
68
}
69

70
static int create_simple_json(char **out_buf, size_t *size)
5✔
71
{
72
    int root_type;
5✔
73
    int ret;
5✔
74
    char json[] = "[\"test\", 1234567890,{\"test\":\"msg\"} ]";
5✔
75

76
    ret = flb_pack_json(&json[0], strlen(json), out_buf, size, &root_type, NULL);
5✔
77
    TEST_CHECK(ret==0);
5✔
78

79
    return ret;
5✔
80
}
81

82

83
/* Callback to check expected results */
84
static int cb_check_result_json(void *record, size_t size, void *data)
7✔
85
{
86
    char *p;
7✔
87
    char *expected;
7✔
88
    char *result;
7✔
89
    int num = get_output_num();
7✔
90

91
    set_output_num(num+1);
7✔
92

93
    expected = (char *) data;
7✔
94
    result = (char *) record;
7✔
95

96
    p = strstr(result, expected);
7✔
97
    TEST_CHECK(p != NULL);
7✔
98

99
    if (p==NULL) {
7✔
100
        flb_error("Expected to find: '%s' in result '%s'",
×
101
                  expected, result);
102
    }
103
    /*
104
     * If you want to debug your test
105
     *
106
     * printf("Expect: '%s' in result '%s'", expected, result);
107
     */
108
    flb_free(record);
7✔
109
    return 0;
7✔
110
}
111

112
static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data)
7✔
113
{
114
    int i_ffd;
7✔
115
    int o_ffd;
7✔
116
    struct test_ctx *ctx = NULL;
7✔
117

118
    ctx = flb_malloc(sizeof(struct test_ctx));
7✔
119
    if (!TEST_CHECK(ctx != NULL)) {
7✔
120
        TEST_MSG("malloc failed");
×
121
        flb_errno();
×
122
        return NULL;
×
123
    }
124

125
    /* Service config */
126
    ctx->flb = flb_create();
7✔
127
    flb_service_set(ctx->flb,
7✔
128
                    "Flush", "0.200000000",
129
                    "Grace", "1",
130
                    "Log_Level", "error",
131
                    NULL);
132

133
    /* Input */
134
    i_ffd = flb_input(ctx->flb, (char *) "forward", NULL);
7✔
135
    TEST_CHECK(i_ffd >= 0);
7✔
136
    ctx->i_ffd = i_ffd;
7✔
137

138
    /* Output */
139
    o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data);
7✔
140
    ctx->o_ffd = o_ffd;
7✔
141

142
    return ctx;
7✔
143
}
144

145
static void test_ctx_destroy(struct test_ctx *ctx)
7✔
146
{
147
    TEST_CHECK(ctx != NULL);
7✔
148

149
    sleep(1);
7✔
150
    flb_stop(ctx->flb);
7✔
151
    flb_destroy(ctx->flb);
7✔
152
    flb_free(ctx);
7✔
153
}
7✔
154

155
#define DEFAULT_HOST "127.0.0.1"
156
#define DEFAULT_PORT 24224
157
static flb_sockfd_t connect_tcp(char *in_host, int in_port)
6✔
158
{
159
    int port = in_port;
6✔
160
    char *host = in_host;
6✔
161
    flb_sockfd_t fd;
6✔
162
    int ret;
6✔
163
    struct sockaddr_in addr;
6✔
164

165
    if (host == NULL) {
6✔
166
        host = DEFAULT_HOST;
6✔
167
    }
168
    if (port < 0) {
6✔
169
        port = DEFAULT_PORT;
5✔
170
    }
171

172
    memset(&addr, 0, sizeof(addr));
6✔
173
    fd = socket(PF_INET, SOCK_STREAM, 0);
6✔
174
    if (!TEST_CHECK(fd >= 0)) {
6✔
175
        TEST_MSG("failed to socket. host=%s port=%d errno=%d", host, port, errno);
×
176
        return -1;
×
177
    }
178

179
    addr.sin_family = AF_INET;
6✔
180
    addr.sin_addr.s_addr = inet_addr(host);
6✔
181
    addr.sin_port = htons(port);
6✔
182

183
    ret = connect(fd, (const struct sockaddr *)&addr, sizeof(addr));
6✔
184
    if (!TEST_CHECK(ret >= 0)) {
6✔
185
        TEST_MSG("failed to connect. host=%s port=%d errno=%d", host, port, errno);
×
186
        flb_socket_close(fd);
×
187
        return -1;
×
188
    }
189
    return fd;
190
}
191

192
void flb_test_forward()
1✔
193
{
194
    struct flb_lib_out_cb cb_data;
1✔
195
    struct test_ctx *ctx;
1✔
196
    flb_sockfd_t fd;
1✔
197
    int ret;
1✔
198
    int num;
1✔
199
    ssize_t w_size;
1✔
200

201
    char *buf;
1✔
202
    size_t size;
1✔
203

204
    clear_output_num();
1✔
205

206
    cb_data.cb = cb_check_result_json;
1✔
207
    cb_data.data = "\"test\":\"msg\"";
1✔
208

209
    ctx = test_ctx_create(&cb_data);
1✔
210
    if (!TEST_CHECK(ctx != NULL)) {
1✔
211
        TEST_MSG("test_ctx_create failed");
×
212
        exit(EXIT_FAILURE);
×
213
    }
214

215
    ret = flb_output_set(ctx->flb, ctx->o_ffd,
1✔
216
                         "match", "test",
217
                         "format", "json",
218
                         NULL);
219
    TEST_CHECK(ret == 0);
1✔
220

221
    /* Start the engine */
222
    ret = flb_start(ctx->flb);
1✔
223
    TEST_CHECK(ret == 0);
1✔
224

225
    /* use default host/port */
226
    fd = connect_tcp(NULL, -1);
1✔
227
    if (!TEST_CHECK(fd >= 0)) {
1✔
228
        exit(EXIT_FAILURE);
×
229
    }
230
    create_simple_json(&buf, &size);
1✔
231
    w_size = send(fd, buf, size, 0);
1✔
232
    flb_free(buf);
1✔
233
    if (!TEST_CHECK(w_size == size)) {
1✔
234
        TEST_MSG("failed to send, errno=%d", errno);
×
235
        flb_socket_close(fd);
×
236
        exit(EXIT_FAILURE);
×
237
    }
238

239
    /* waiting to flush */
240
    flb_time_msleep(1500);
1✔
241

242
    num = get_output_num();
1✔
243
    if (!TEST_CHECK(num > 0))  {
1✔
244
        TEST_MSG("no outputs");
×
245
    }
246

247
    flb_socket_close(fd);
1✔
248
    test_ctx_destroy(ctx);
1✔
249
}
1✔
250

251
void flb_test_forward_port()
1✔
252
{
253
    struct flb_lib_out_cb cb_data;
1✔
254
    struct test_ctx *ctx;
1✔
255

256
    flb_sockfd_t fd;
1✔
257
    int ret;
1✔
258
    int num;
1✔
259
    ssize_t w_size;
1✔
260
    char *port = "24000";
1✔
261

262
    char *buf;
1✔
263
    size_t size;
1✔
264

265
    clear_output_num();
1✔
266

267
    cb_data.cb = cb_check_result_json;
1✔
268
    cb_data.data = "\"test\":\"msg\"";
1✔
269

270
    ctx = test_ctx_create(&cb_data);
1✔
271
    if (!TEST_CHECK(ctx != NULL)) {
1✔
272
        TEST_MSG("test_ctx_create failed");
×
273
        exit(EXIT_FAILURE);
×
274
    }
275

276
    ret = flb_input_set(ctx->flb, ctx->i_ffd,
1✔
277
                        "port", port,
278
                        NULL);
279
    TEST_CHECK(ret == 0);
1✔
280

281
    ret = flb_output_set(ctx->flb, ctx->o_ffd,
1✔
282
                         "match", "test",
283
                         "format", "json",
284
                         NULL);
285
    TEST_CHECK(ret == 0);
1✔
286

287
    /* Start the engine */
288
    ret = flb_start(ctx->flb);
1✔
289
    TEST_CHECK(ret == 0);
1✔
290

291
    /* use default host */
292
    fd = connect_tcp(NULL, atoi(port));
1✔
293
    if (!TEST_CHECK(fd >= 0)) {
1✔
294
        exit(EXIT_FAILURE);
×
295
    }
296

297
    create_simple_json(&buf, &size);
1✔
298
    w_size = send(fd, buf, size, 0);
1✔
299
    flb_free(buf);
1✔
300
    if (!TEST_CHECK(w_size == size)) {
1✔
301
        TEST_MSG("failed to send, errno=%d", errno);
×
302
        flb_socket_close(fd);
×
303
        exit(EXIT_FAILURE);
×
304
    }
305

306
    /* waiting to flush */
307
    flb_time_msleep(1500);
1✔
308

309
    num = get_output_num();
1✔
310
    if (!TEST_CHECK(num > 0))  {
1✔
311
        TEST_MSG("no outputs");
×
312
    }
313

314
    flb_socket_close(fd);
1✔
315
    test_ctx_destroy(ctx);
1✔
316
}
1✔
317

318
void flb_test_tag_prefix()
1✔
319
{
320
    struct flb_lib_out_cb cb_data;
1✔
321
    struct test_ctx *ctx;
1✔
322
    char *tag_prefix = "tag_";
1✔
323
    flb_sockfd_t fd;
1✔
324
    int ret;
1✔
325
    int num;
1✔
326
    ssize_t w_size;
1✔
327

328
    char *buf;
1✔
329
    size_t size;
1✔
330

331
    clear_output_num();
1✔
332

333
    cb_data.cb = cb_check_result_json;
1✔
334
    cb_data.data = "\"test\":\"msg\"";
1✔
335

336
    ctx = test_ctx_create(&cb_data);
1✔
337
    if (!TEST_CHECK(ctx != NULL)) {
1✔
338
        TEST_MSG("test_ctx_create failed");
×
339
        exit(EXIT_FAILURE);
×
340
    }
341
    ret = flb_input_set(ctx->flb, ctx->i_ffd,
1✔
342
                        "tag_prefix", tag_prefix,
343
                        NULL);
344
    TEST_CHECK(ret == 0);
1✔
345

346
    ret = flb_output_set(ctx->flb, ctx->o_ffd,
1✔
347
                         "match", "tag_test", /*tag_prefix + "test"*/
348
                         "format", "json",
349
                         NULL);
350
    TEST_CHECK(ret == 0);
1✔
351

352
    /* Start the engine */
353
    ret = flb_start(ctx->flb);
1✔
354
    TEST_CHECK(ret == 0);
1✔
355

356
    /* use default host/port */
357
    fd = connect_tcp(NULL, -1);
1✔
358
    if (!TEST_CHECK(fd >= 0)) {
1✔
359
        exit(EXIT_FAILURE);
×
360
    }
361

362
    create_simple_json(&buf, &size);
1✔
363
    w_size = send(fd, buf, size, 0);
1✔
364
    flb_free(buf);
1✔
365
    if (!TEST_CHECK(w_size == size)) {
1✔
366
        TEST_MSG("failed to send, errno=%d", errno);
×
367
        flb_socket_close(fd);
×
368
        exit(EXIT_FAILURE);
×
369
    }
370

371
    /* waiting to flush */
372
    flb_time_msleep(1500);
1✔
373

374
    num = get_output_num();
1✔
375
    if (!TEST_CHECK(num > 0))  {
1✔
376
        TEST_MSG("no outputs");
×
377
    }
378

379
    flb_socket_close(fd);
1✔
380
    test_ctx_destroy(ctx);
1✔
381
}
1✔
382

383
#ifdef FLB_HAVE_UNIX_SOCKET
384
void flb_test_unix_path()
1✔
385
{
386
    struct flb_lib_out_cb cb_data;
1✔
387
    struct test_ctx *ctx;
1✔
388
    struct sockaddr_un sun;
1✔
389
    flb_sockfd_t fd;
1✔
390
    int ret;
1✔
391
    int num;
1✔
392
    ssize_t w_size;
1✔
393
    char *unix_path = "in_forward_unix";
1✔
394

395
    char *buf;
1✔
396
    size_t size;
1✔
397

398
    clear_output_num();
1✔
399

400
    cb_data.cb = cb_check_result_json;
1✔
401
    cb_data.data = "\"test\":\"msg\"";
1✔
402

403
    ctx = test_ctx_create(&cb_data);
1✔
404
    if (!TEST_CHECK(ctx != NULL)) {
1✔
405
        TEST_MSG("test_ctx_create failed");
×
406
        exit(EXIT_FAILURE);
×
407
    }
408

409
    ret = flb_input_set(ctx->flb, ctx->i_ffd,
1✔
410
                        "unix_path", unix_path,
411
                        NULL);
412
    TEST_CHECK(ret == 0);
1✔
413

414
    ret = flb_output_set(ctx->flb, ctx->o_ffd,
1✔
415
                         "match", "test",
416
                         "format", "json",
417
                         NULL);
418
    TEST_CHECK(ret == 0);
1✔
419

420
    /* Start the engine */
421
    ret = flb_start(ctx->flb);
1✔
422
    TEST_CHECK(ret == 0);
1✔
423

424
    /* waiting to create socket */
425
    flb_time_msleep(200);
1✔
426

427
    memset(&sun, 0, sizeof(sun));
1✔
428
    fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1✔
429
    if (!TEST_CHECK(fd >= 0)) {
1✔
430
        TEST_MSG("failed to socket %s, errno=%d", unix_path, errno);
×
431
        unlink(unix_path);
×
432
        exit(EXIT_FAILURE);
×
433
    }
434

435
    sun.sun_family = AF_LOCAL;
1✔
436
    strcpy(sun.sun_path, unix_path);
1✔
437
    ret = connect(fd, (const struct sockaddr *)&sun, sizeof(sun));
1✔
438
    if (!TEST_CHECK(ret >= 0)) {
1✔
439
        TEST_MSG("failed to connect, errno=%d", errno);
×
440
        flb_socket_close(fd);
×
441
        unlink(unix_path);
×
442
        exit(EXIT_FAILURE);
×
443
    }
444
    create_simple_json(&buf, &size);
1✔
445
    w_size = send(fd, buf, size, 0);
1✔
446
    flb_free(buf);
1✔
447
    if (!TEST_CHECK(w_size == size)) {
1✔
448
        TEST_MSG("failed to write to %s", unix_path);
×
449
        flb_socket_close(fd);
×
450
        unlink(unix_path);
×
451
        exit(EXIT_FAILURE);
×
452
    }
453

454
    /* waiting to flush */
455
    flb_time_msleep(1500);
1✔
456

457
    num = get_output_num();
1✔
458
    if (!TEST_CHECK(num > 0))  {
1✔
459
        TEST_MSG("no outputs");
×
460
    }
461

462
    flb_socket_close(fd);
1✔
463
    test_ctx_destroy(ctx);
1✔
464
}
1✔
465

466

467
void flb_test_unix_perm()
1✔
468
{
469
    struct flb_lib_out_cb cb_data;
1✔
470
    struct test_ctx *ctx;
1✔
471
    struct sockaddr_un sun;
1✔
472
    flb_sockfd_t fd;
1✔
473
    int ret;
1✔
474
    int num;
1✔
475
    ssize_t w_size;
1✔
476
    char *unix_path = "in_forward_unix";
1✔
477
    struct stat sb;
1✔
478

479
    char *buf;
1✔
480
    size_t size;
1✔
481

482
    clear_output_num();
1✔
483

484
    cb_data.cb = cb_check_result_json;
1✔
485
    cb_data.data = "\"test\":\"msg\"";
1✔
486

487
    ctx = test_ctx_create(&cb_data);
1✔
488
    if (!TEST_CHECK(ctx != NULL)) {
1✔
489
        TEST_MSG("test_ctx_create failed");
×
490
        exit(EXIT_FAILURE);
×
491
    }
492

493
    ret = flb_input_set(ctx->flb, ctx->i_ffd,
1✔
494
                        "unix_path", unix_path,
495
                        "unix_perm", "0600",
496
                        NULL);
497
    TEST_CHECK(ret == 0);
1✔
498

499
    ret = flb_output_set(ctx->flb, ctx->o_ffd,
1✔
500
                         "match", "test",
501
                         "format", "json",
502
                         NULL);
503
    TEST_CHECK(ret == 0);
1✔
504

505
    /* Start the engine */
506
    ret = flb_start(ctx->flb);
1✔
507
    TEST_CHECK(ret == 0);
1✔
508

509
    /* waiting to create socket */
510
    flb_time_msleep(200);
1✔
511

512
    memset(&sun, 0, sizeof(sun));
1✔
513
    fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1✔
514
    if (!TEST_CHECK(fd >= 0)) {
1✔
515
        TEST_MSG("failed to socket %s, errno=%d", unix_path, errno);
×
516
        unlink(unix_path);
×
517
        exit(EXIT_FAILURE);
×
518
    }
519

520
    sun.sun_family = AF_LOCAL;
1✔
521
    strcpy(sun.sun_path, unix_path);
1✔
522
    ret = connect(fd, (const struct sockaddr *)&sun, sizeof(sun));
1✔
523
    if (!TEST_CHECK(ret >= 0)) {
1✔
524
        TEST_MSG("failed to connect, errno=%d", errno);
×
525
        flb_socket_close(fd);
×
526
        unlink(unix_path);
×
527
        exit(EXIT_FAILURE);
×
528
    }
529
    create_simple_json(&buf, &size);
1✔
530
    w_size = send(fd, buf, size, 0);
1✔
531
    flb_free(buf);
1✔
532
    if (!TEST_CHECK(w_size == size)) {
1✔
533
        TEST_MSG("failed to write to %s", unix_path);
×
534
        flb_socket_close(fd);
×
535
        unlink(unix_path);
×
536
        exit(EXIT_FAILURE);
×
537
    }
538

539
    /* waiting to flush */
540
    flb_time_msleep(1500);
1✔
541

542
    num = get_output_num();
1✔
543
    if (!TEST_CHECK(num > 0))  {
1✔
544
        TEST_MSG("no outputs");
×
545
    }
546

547

548
    /* File permission */
549
    ret = stat(unix_path, &sb);
1✔
550
    if (!TEST_CHECK(ret == 0)) {
1✔
551
        TEST_MSG("stat failed. errno=%d", errno);
×
552
                test_ctx_destroy(ctx);
×
553
        exit(EXIT_FAILURE);
×
554
    }
555

556
    if (!TEST_CHECK((sb.st_mode & S_IRWXO) == 0)) {
1✔
557
        TEST_MSG("Permssion(others) error. val=0x%x",sb.st_mode & S_IRWXO);
×
558
    }
559
    if (!TEST_CHECK((sb.st_mode & S_IRWXG) == 0)) {
1✔
560
        TEST_MSG("Permssion(group) error. val=0x%x",sb.st_mode & S_IRWXG);
×
561
    }
562
    if (!TEST_CHECK((sb.st_mode & S_IRWXU) == (S_IRUSR | S_IWUSR))) {
1✔
563
        TEST_MSG("Permssion(user) error. val=0x%x",sb.st_mode & S_IRWXU);
×
564
    }
565

566
    flb_socket_close(fd);
1✔
567
    test_ctx_destroy(ctx);
1✔
568
}
1✔
569
#endif /* FLB_HAVE_UNIX_SOCKET */
570

571
static int cb_count_only(void *record, size_t size, void *data)
1✔
572
{
573
    int n = get_output_num();
1✔
574
    set_output_num(n + 1);
1✔
575
    flb_free(record);
1✔
576
    return 0;
1✔
577
}
578

579
void flb_test_threaded_forward_issue_10946()
1✔
580
{
581
    struct flb_lib_out_cb cb = {0};
1✔
582
    flb_ctx_t *ctx;
1✔
583
    int in_ffd, out_ffd, ret;
1✔
584
    int out_count;
1✔
585
    flb_sockfd_t fd;
1✔
586
    char *buf;
1✔
587
    size_t size;
1✔
588
    int root_type;
1✔
589
    struct flb_processor *proc;
1✔
590
    struct flb_processor_unit *pu;
1✔
591
    struct cfl_variant v_key = {
1✔
592
        .type = CFL_VARIANT_STRING,
593
        .data.as_string = "log"
594
    };
595
    struct cfl_variant v_mode = {
1✔
596
        .type = CFL_VARIANT_STRING,
597
        .data.as_string = "partial_message"
598
    };
599
    char *json = "[\"logs\",1234567890,{\"log\":\"hello\"}]";
1✔
600

601
    clear_output_num();
1✔
602

603
    cb.cb   = cb_count_only;
1✔
604
    cb.data = &out_count;
1✔
605

606
    /* Service */
607
    ctx = flb_create();
1✔
608
    TEST_CHECK(ctx != NULL);
1✔
609
    flb_service_set(ctx,
1✔
610
                    "Flush", "0.200000000",
611
                    "Grace", "1",
612
                    "Log_Level", "error",
613
                    NULL);
614

615
    in_ffd = flb_input(ctx, (char *) "forward", NULL);
1✔
616
    TEST_CHECK(in_ffd >= 0);
1✔
617
    ret = flb_input_set(ctx, in_ffd,
1✔
618
                        "tag", "logs",
619
                        "threaded", "true",
620
                        NULL);
621
    TEST_CHECK(ret == 0);
1✔
622

623
    /* Attach a logs-processor: multiline (minimal settings).
624
     * This mirrors the YAML:
625
     *   processors.logs:
626
     *     - name: multiline
627
     *       multiline.key_content: log
628
     *       mode: partial_message
629
     */
630
    proc = flb_processor_create(ctx->config, "ut", NULL, 0);
1✔
631
    TEST_CHECK(proc != NULL);
1✔
632

633
    pu = flb_processor_unit_create(proc, FLB_PROCESSOR_LOGS, "multiline");
1✔
634
    TEST_CHECK(pu != NULL);
1✔
635

636
    ret = flb_processor_unit_set_property(pu, "multiline.key_content", &v_key);
1✔
637
    TEST_CHECK(ret == 0);
1✔
638

639
    ret = flb_processor_unit_set_property(pu, "mode", &v_mode);
1✔
640
    TEST_CHECK(ret == 0);
1✔
641

642
    ret = flb_input_set_processor(ctx, in_ffd, proc);
1✔
643
    TEST_CHECK(ret == 0);
1✔
644

645
    /* Output: lib -> count arrivals of tag 'logs' (after processors) */
646
    out_ffd = flb_output(ctx, (char *) "lib", (void *) &cb);
1✔
647
    TEST_CHECK(out_ffd >= 0);
1✔
648
    ret = flb_output_set(ctx, out_ffd,
1✔
649
                         "match", "logs",
650
                         "format", "json",
651
                         NULL);
652
    TEST_CHECK(ret == 0);
1✔
653

654
    /* Start engine */
655
    ret = flb_start(ctx);
1✔
656
    TEST_CHECK(ret == 0);
1✔
657

658
    /* Send a single Forward frame to 'logs' */
659
    fd = connect_tcp(NULL, -1);
1✔
660
    TEST_CHECK(fd >= 0);
1✔
661

662
    /* ["logs", 1234567890, {"log":"hello"}] */
663
    ret = flb_pack_json(json, strlen(json), &buf, &size, &root_type, NULL);
1✔
664
    TEST_CHECK(ret == 0);
1✔
665
    TEST_CHECK(send(fd, buf, size, 0) == (ssize_t) size);
1✔
666
    flb_free(buf);
1✔
667

668
    /* Give it a moment to flush */
669
    flb_time_msleep(1500);
1✔
670

671
    /* With the fix, at least one record must arrive */
672
    out_count = get_output_num();
1✔
673
    TEST_CHECK(out_count > 0);
1✔
674
    if (!TEST_CHECK(out_count > 0)) {
1✔
675
        TEST_MSG("no outputs with threaded+multiline; emitter RB/collector likely missing");
×
676
    }
677

678
    /* Cleanup */
679
    flb_socket_close(fd);
1✔
680
    flb_stop(ctx);
1✔
681
    flb_destroy(ctx);
1✔
682
}
1✔
683

684
/* Static callback for fw_make_ctx_with_forward to avoid stack-use-after-return */
685
static struct flb_lib_out_cb fw_ctx_cb = {0};
686

687
static flb_ctx_t *fw_make_ctx_with_forward(int *in_ffd_out, int *out_ffd_out)
4✔
688
{
689
    flb_ctx_t *ctx;
4✔
690
    int in_ffd, out_ffd, ret;
4✔
691

692
    ctx = flb_create();
4✔
693
    TEST_CHECK(ctx != NULL);
4✔
694
    if (!ctx) { return NULL; }
4✔
695

696
    flb_service_set(ctx,
4✔
697
                    "Flush", "0.200000000",
698
                    "Grace", "1",
699
                    "Log_Level", "error",
700
                    NULL);
701

702
    /* forward input */
703
    in_ffd = flb_input(ctx, (char *) "forward", NULL);
4✔
704
    TEST_CHECK(in_ffd >= 0);
4✔
705
    if (in_ffd < 0) { flb_destroy(ctx); return NULL; }
4✔
706

707
    /* lib output: count only (no payload check) */
708
    fw_ctx_cb.cb   = cb_count_only;
4✔
709
    fw_ctx_cb.data = NULL;
4✔
710
    out_ffd = flb_output(ctx, (char *) "lib", (void *) &fw_ctx_cb);
4✔
711
    TEST_CHECK(out_ffd >= 0);
4✔
712
    if (out_ffd < 0) {
4✔
713
        flb_destroy(ctx);
×
714
        return NULL;
×
715
    }
716
    ret = flb_output_set(ctx, out_ffd,
4✔
717
                         "match", "*",
718
                         "format", "json",
719
                         NULL);
720
    TEST_CHECK(ret == 0);
4✔
721

722
    if (in_ffd_out)  *in_ffd_out  = in_ffd;
4✔
723
    if (out_ffd_out) *out_ffd_out = out_ffd;
4✔
724
    return ctx;
725
}
726

727
/* 1) users-only => must fail to start (fail-close) */
728
void flb_test_fw_auth_users_only_fail_start()
1✔
729
{
730
    flb_ctx_t *ctx;
1✔
731
    int in_ffd, out_ffd, ret;
1✔
732

733
    ctx = fw_make_ctx_with_forward(&in_ffd, &out_ffd);
1✔
734
    TEST_CHECK(ctx != NULL);
1✔
735
    if (!ctx) {
1✔
736
        return;
×
737
    }
738

739
    ret = flb_input_set(ctx, in_ffd,
1✔
740
                        "tag", "test",
741
                        "security.users", "alice s3cr3t",
742
                        NULL);
743
    TEST_CHECK(ret == 0);
1✔
744

745
    ret = flb_start(ctx);
1✔
746
    TEST_CHECK(ret != 0);
1✔
747
    if (ret == 0) {
1✔
748
        TEST_MSG("users-only config unexpectedly started; fail-close not enforced");
×
749
        flb_stop(ctx);
×
750
    }
751
    flb_destroy(ctx);
1✔
752
}
753

754
/* 2) empty_shared_key + users => start OK */
755
void flb_test_fw_auth_empty_shared_key_plus_users_start_ok()
1✔
756
{
757
    flb_ctx_t *ctx;
1✔
758
    int in_ffd, out_ffd, ret;
1✔
759

760
    ctx = fw_make_ctx_with_forward(&in_ffd, &out_ffd);
1✔
761
    TEST_CHECK(ctx != NULL);
1✔
762
    if (!ctx) { return; }
1✔
763

764
    ret = flb_input_set(ctx, in_ffd,
1✔
765
                        "tag", "test",
766
                        "empty_shared_key", "true",
767
                        "security.users", "alice s3cr3t",
768
                        NULL);
769
    TEST_CHECK(ret == 0);
1✔
770

771
    ret = flb_start(ctx);
1✔
772
    TEST_CHECK(ret == 0);
1✔
773
    if (ret == 0) {
1✔
774
        flb_stop(ctx);
1✔
775
    }
776
    flb_destroy(ctx);
1✔
777
}
778

779
/* 3) shared_key only => start OK (backward compatible) */
780
void flb_test_fw_auth_shared_key_only_start_ok()
1✔
781
{
782
    flb_ctx_t *ctx;
1✔
783
    int in_ffd, out_ffd, ret;
1✔
784

785
    ctx = fw_make_ctx_with_forward(&in_ffd, &out_ffd);
1✔
786
    TEST_CHECK(ctx != NULL);
1✔
787
    if (!ctx) { return; }
1✔
788

789
    ret = flb_input_set(ctx, in_ffd,
1✔
790
                        "tag", "test",
791
                        "shared_key", "k",
792
                        NULL);
793
    TEST_CHECK(ret == 0);
1✔
794

795
    ret = flb_start(ctx);
1✔
796
    TEST_CHECK(ret == 0);
1✔
797
    if (ret == 0) {
1✔
798
        flb_stop(ctx);
1✔
799
    }
800
    flb_destroy(ctx);
1✔
801
}
802

803
/* 4) shared_key + users => start OK (both checks) */
804
void flb_test_fw_auth_shared_key_plus_users_start_ok()
1✔
805
{
806
    flb_ctx_t *ctx;
1✔
807
    int in_ffd, out_ffd, ret;
1✔
808

809
    ctx = fw_make_ctx_with_forward(&in_ffd, &out_ffd);
1✔
810
    TEST_CHECK(ctx != NULL);
1✔
811
    if (!ctx) { return; }
1✔
812

813
    ret = flb_input_set(ctx, in_ffd,
1✔
814
                        "tag", "test",
815
                        "shared_key", "k",
816
                        "security.users", "alice s3cr3t",
817
                        NULL);
818
    TEST_CHECK(ret == 0);
1✔
819

820
    ret = flb_start(ctx);
1✔
821
    TEST_CHECK(ret == 0);
1✔
822
    if (ret == 0) {
1✔
823
        flb_stop(ctx);
1✔
824
    }
825
    flb_destroy(ctx);
1✔
826
}
827

828
/*
829
 * Creates a forward-protocol-compliant, Gzip-compressed MessagePack payload.
830
 * The final structure is: [tag, compressed_events, {options}]
831
 */
832
static int create_simple_json_gzip(msgpack_sbuffer *sbuf)
1✔
833
{
834
    int ret;
1✔
835
    char *event_buf;
1✔
836
    size_t event_size;
1✔
837
    char *compressed_buf;
1✔
838
    size_t compressed_size;
1✔
839
    int root_type;
1✔
840
    msgpack_packer pck;
1✔
841

842
    char *tag = "test";
1✔
843
    char event_json[] = "[1234567890,{\"test\":\"msg\"}]";
1✔
844

845
    ret = flb_pack_json(event_json, strlen(event_json),
1✔
846
                        &event_buf, &event_size, &root_type, NULL);
847
    if (!TEST_CHECK(ret == 0)) {
1✔
848
        return -1;
849
    }
850

851
    ret = flb_gzip_compress(event_buf, event_size,
1✔
852
                            (void **)&compressed_buf, &compressed_size);
853
    if (!TEST_CHECK(ret == 0)) {
1✔
854
        flb_free(event_buf);
×
855
        return -1;
×
856
    }
857
    flb_free(event_buf);
1✔
858

859
    /* Create temporary msgpack buffer */
860
    msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write);
1✔
861

862
    msgpack_pack_array(&pck, 3);
1✔
863
    msgpack_pack_str_with_body(&pck, tag, strlen(tag));
1✔
864
    msgpack_pack_bin_with_body(&pck, compressed_buf, compressed_size);
1✔
865
    msgpack_pack_map(&pck, 2);
1✔
866
    msgpack_pack_str_with_body(&pck, "compressed", 10);
1✔
867
    msgpack_pack_str_with_body(&pck, "gzip", 4);
1✔
868
    msgpack_pack_str_with_body(&pck, "size", 4);
1✔
869
    msgpack_pack_uint64(&pck, event_size);
1✔
870

871
    flb_free(compressed_buf);
1✔
872

873
    return 0;
1✔
874
}
875

876
void flb_test_forward_gzip()
1✔
877
{
878
    struct flb_lib_out_cb cb_data;
1✔
879
    struct test_ctx *ctx;
1✔
880
    flb_sockfd_t fd;
1✔
881
    int ret;
1✔
882
    int num;
1✔
883
    ssize_t w_size;
1✔
884

885
    char *buf;
1✔
886
    size_t size;
1✔
887

888
    msgpack_sbuffer sbuf;
1✔
889

890
    clear_output_num();
1✔
891

892
    cb_data.cb = cb_check_result_json;
1✔
893
    cb_data.data = "\"test\":\"msg\"";
1✔
894

895
    ctx = test_ctx_create(&cb_data);
1✔
896
    if (!TEST_CHECK(ctx != NULL)) {
1✔
897
        TEST_MSG("test_ctx_create failed");
×
898
        exit(EXIT_FAILURE);
×
899
    }
900

901
    ret = flb_output_set(ctx->flb, ctx->o_ffd,
1✔
902
                         "match", "test",
903
                         "format", "json",
904
                         NULL);
905
    TEST_CHECK(ret == 0);
1✔
906

907
    ret = flb_start(ctx->flb);
1✔
908
    TEST_CHECK(ret == 0);
1✔
909

910
    fd = connect_tcp(NULL, -1);
1✔
911
    if (!TEST_CHECK(fd >= 0)) {
1✔
912
        exit(EXIT_FAILURE);
×
913
    }
914

915
    msgpack_sbuffer_init(&sbuf);
1✔
916
    create_simple_json_gzip(&sbuf);
1✔
917

918
    w_size = send(fd, sbuf.data, sbuf.size, 0);
1✔
919
    if (!TEST_CHECK(w_size == sbuf.size)) {
1✔
920
        TEST_MSG("failed to send, errno=%d", errno);
×
921
        flb_socket_close(fd);
×
922
        msgpack_sbuffer_destroy(&sbuf);
×
923
        exit(EXIT_FAILURE);
×
924
    }
925

926
    msgpack_sbuffer_destroy(&sbuf);
1✔
927

928
    flb_time_msleep(1500);
1✔
929

930
    num = get_output_num();
1✔
931
    if (!TEST_CHECK(num > 0))  {
1✔
932
        TEST_MSG("no outputs");
×
933
    }
934

935
    flb_socket_close(fd);
1✔
936
    test_ctx_destroy(ctx);
1✔
937
}
1✔
938

939
/*
940
 * Creates a forward-protocol-compliant, Zstd-compressed MessagePack payload.
941
 * The final structure is: [tag, compressed_events, {options}]
942
 */
943
static int create_simple_json_zstd(msgpack_sbuffer *sbuf)
1✔
944
{
945
    int ret;
1✔
946
    char *event_buf;
1✔
947
    size_t event_size;
1✔
948
    char *compressed_buf;
1✔
949
    size_t compressed_size;
1✔
950
    int root_type;
1✔
951
    msgpack_packer pck;
1✔
952

953
    char *tag = "test";
1✔
954
    char event_json[] = "[1234567890,{\"test\":\"msg\"}]";
1✔
955

956
    ret = flb_pack_json(event_json, strlen(event_json),
1✔
957
                        &event_buf, &event_size, &root_type, NULL);
958
    if (!TEST_CHECK(ret == 0)) {
1✔
959
        return -1;
960
    }
961

962
    ret = flb_zstd_compress(event_buf, event_size,
1✔
963
                            (void **)&compressed_buf, &compressed_size);
964
    if (!TEST_CHECK(ret == 0)) {
1✔
965
        flb_free(event_buf);
×
966
        return -1;
×
967
    }
968
    flb_free(event_buf);
1✔
969

970
    /* Create temporary msgpack buffer */
971
    msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write);
1✔
972

973
    msgpack_pack_array(&pck, 3);
1✔
974
    msgpack_pack_str_with_body(&pck, tag, strlen(tag));
1✔
975
    msgpack_pack_bin_with_body(&pck, compressed_buf, compressed_size);
1✔
976
    msgpack_pack_map(&pck, 2);
1✔
977
    msgpack_pack_str_with_body(&pck, "compressed", 10);
1✔
978
    msgpack_pack_str_with_body(&pck, "zstd", 4);
1✔
979
    msgpack_pack_str_with_body(&pck, "size", 4);
1✔
980
    msgpack_pack_uint64(&pck, event_size);
1✔
981

982
    flb_free(compressed_buf);
1✔
983

984
    return 0;
1✔
985
}
986

987
void flb_test_forward_zstd()
1✔
988
{
989
    struct flb_lib_out_cb cb_data;
1✔
990
    struct test_ctx *ctx;
1✔
991
    flb_sockfd_t fd;
1✔
992
    int ret;
1✔
993
    int num;
1✔
994
    ssize_t w_size;
1✔
995

996
    char *buf;
1✔
997
    size_t size;
1✔
998

999
    msgpack_sbuffer sbuf;
1✔
1000

1001
    clear_output_num();
1✔
1002

1003
    cb_data.cb = cb_check_result_json;
1✔
1004
    cb_data.data = "\"test\":\"msg\"";
1✔
1005

1006
    ctx = test_ctx_create(&cb_data);
1✔
1007
    if (!TEST_CHECK(ctx != NULL)) {
1✔
1008
        TEST_MSG("test_ctx_create failed");
×
1009
        exit(EXIT_FAILURE);
×
1010
    }
1011

1012
    ret = flb_output_set(ctx->flb, ctx->o_ffd,
1✔
1013
                         "match", "test",
1014
                         "format", "json",
1015
                         NULL);
1016
    TEST_CHECK(ret == 0);
1✔
1017

1018
    ret = flb_start(ctx->flb);
1✔
1019
    TEST_CHECK(ret == 0);
1✔
1020

1021
    fd = connect_tcp(NULL, -1);
1✔
1022
    if (!TEST_CHECK(fd >= 0)) {
1✔
1023
        exit(EXIT_FAILURE);
×
1024
    }
1025

1026
    msgpack_sbuffer_init(&sbuf);
1✔
1027
    create_simple_json_zstd(&sbuf);
1✔
1028

1029
    w_size = send(fd, sbuf.data, sbuf.size, 0);
1✔
1030
    if (!TEST_CHECK(w_size == sbuf.size)) {
1✔
1031
        TEST_MSG("failed to send, errno=%d", errno);
×
1032
        flb_socket_close(fd);
×
1033
        msgpack_sbuffer_destroy(&sbuf);
×
1034
        exit(EXIT_FAILURE);
×
1035
    }
1036

1037
    msgpack_sbuffer_destroy(&sbuf);
1✔
1038

1039
    flb_time_msleep(1500);
1✔
1040

1041
    num = get_output_num();
1✔
1042
    if (!TEST_CHECK(num > 0))  {
1✔
1043
        TEST_MSG("no outputs");
×
1044
    }
1045

1046
    flb_socket_close(fd);
1✔
1047
    test_ctx_destroy(ctx);
1✔
1048
}
1✔
1049

1050

1051
TEST_LIST = {
1052
    {"forward", flb_test_forward},
1053
    {"forward_port", flb_test_forward_port},
1054
    {"tag_prefix", flb_test_tag_prefix},
1055
#ifdef FLB_HAVE_UNIX_SOCKET
1056
    {"unix_path", flb_test_unix_path},
1057
    {"unix_perm", flb_test_unix_perm},
1058
#endif
1059
    {"issue_10946", flb_test_threaded_forward_issue_10946},
1060
    {"fw_auth_users_only_fail_start", flb_test_fw_auth_users_only_fail_start},
1061
    {"fw_auth_empty_shared_key_plus_users_start_ok", flb_test_fw_auth_empty_shared_key_plus_users_start_ok},
1062
    {"fw_auth_shared_key_only_start_ok", flb_test_fw_auth_shared_key_only_start_ok},
1063
    {"fw_auth_shared_key_plus_users_start_ok", flb_test_fw_auth_shared_key_plus_users_start_ok},
1064
    {"forward_gzip", flb_test_forward_gzip},
1065
    {"forward_zstd", flb_test_forward_zstd},
1066
    {NULL, NULL}
1067
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc