• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5462348362

pending completion
5462348362

Pull #219

github

web-flow
Merge fbf2f7ae2 into 99464cfdf
Pull Request #219: Add chunk stream groups

589 of 589 new or added lines in 16 files covered. (100.0%)

5397 of 7206 relevant lines covered (74.9%)

52888.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/src/recv_inproc.cpp
1
/* Copyright 2018-2019, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cstddef>
22
#include <memory>
23
#include <functional>
24
#include <spead2/common_inproc.h>
25
#include <spead2/common_logging.h>
26
#include <spead2/recv_inproc.h>
27
#include <spead2/recv_stream.h>
28

29
namespace spead2
30
{
31
namespace recv
32
{
33

34
inproc_reader::inproc_reader(
146✔
35
    stream &owner,
36
    std::shared_ptr<inproc_queue> queue)
146✔
37
    : reader(owner),
38
    queue(std::move(queue)),
146✔
39
    data_sem_wrapper(wrap_fd(owner.get_io_service(),
146✔
40
                             this->queue->buffer.get_data_sem().get_fd()))
292✔
41
{
42
    enqueue(make_handler_context());
146✔
43
}
146✔
44

45
void inproc_reader::process_one_packet(stream_base::add_packet_state &state,
8,000✔
46
                                       const inproc_queue::packet &packet)
47
{
48
    packet_header header;
49
    std::size_t size = decode_packet(header, packet.data.get(), packet.size);
8,000✔
50
    if (size == packet.size)
7,995✔
51
    {
52
        state.add_packet(header);
7,995✔
53
    }
54
    else if (size != 0)
×
55
    {
56
        log_info("discarding packet due to size mismatch (%1% != %2%)", size, packet.size);
×
57
    }
58
}
7,992✔
59

60
void inproc_reader::packet_handler(
8,024✔
61
    handler_context ctx,
62
    stream_base::add_packet_state &state,
63
    const boost::system::error_code &error,
64
    std::size_t bytes_transferred)
65
{
66
    if (!error)
8,024✔
67
    {
68
        try
69
        {
70
            inproc_queue::packet packet = queue->buffer.try_pop();
8,022✔
71
            process_one_packet(state, packet);
8,000✔
72
            /* TODO: could grab a batch of packets to amortise costs */
73
        }
7,992✔
74
        catch (ringbuffer_stopped &)
47✔
75
        {
76
            state.stop();
47✔
77
        }
47✔
78
        catch (ringbuffer_empty &)
×
79
        {
80
            // spurious wakeup - no action needed
81
        }
×
82
    }
83
    else if (error != boost::asio::error::operation_aborted)
×
84
        log_warning("Error in inproc receiver: %1%", error.message());
×
85

86
    if (!state.is_stopped())
8,046✔
87
        enqueue(std::move(ctx));
7,910✔
88
}
8,042✔
89

90
void inproc_reader::enqueue(handler_context ctx)
8,055✔
91
{
92
    using namespace std::placeholders;
93
    data_sem_wrapper.async_read_some(
8,052✔
94
        boost::asio::null_buffers(),
8,052✔
95
        bind_handler(std::move(ctx), std::bind(&inproc_reader::packet_handler, this, _1, _2, _3, _4)));
16,109✔
96
}
8,052✔
97

98
bool inproc_reader::lossy() const
146✔
99
{
100
    return false;
146✔
101
}
102

103
} // namespace recv
104
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc