• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 8

03 Jul 2025 11:10AM UTC coverage: 73.733%. First build
8

push

github

u236
refactor device search/create routines

6427 of 9054 branches covered (70.99%)

0 of 60 new or added lines in 3 files covered. (0.0%)

12278 of 16652 relevant lines covered (73.73%)

306.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/rpc/rpc_port_driver_list.cpp
1
#include "rpc_port_driver_list.h"
2
#include "rpc_helpers.h"
3
#include "serial_port.h"
4
#include "tcp_port.h"
5

6
#define LOG(logger) ::logger.Log() << "[RPC] "
7

8
namespace
9
{
10
    const size_t MAX_TASK_EXECUTORS = 5;
11

12
    std::string GetRequestPortDescription(const Json::Value& request)
×
13
    {
14
        std::string path;
×
15
        if (WBMQTT::JSON::Get(request, "path", path)) {
×
16
            return path;
×
17
        }
18
        int portNumber;
19
        if (WBMQTT::JSON::Get(request, "ip", path) && WBMQTT::JSON::Get(request, "port", portNumber)) {
×
20
            return path + ":" + std::to_string(portNumber);
×
21
        }
22
        throw TRPCException("Port is not defined", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
23
    }
24

25
    PPort InitPort(const Json::Value& request)
×
26
    {
27
        if (request.isMember("path")) {
×
28
            std::string path;
×
29
            WBMQTT::JSON::Get(request, "path", path);
×
30
            TSerialPortSettings settings(path, ParseRPCSerialPortSettings(request));
×
31

32
            LOG(Debug) << "Create serial port: " << path;
×
33
            return std::make_shared<TSerialPort>(settings);
×
34
        }
35
        if (request.isMember("ip") && request.isMember("port")) {
×
36
            std::string address;
×
37
            int portNumber = 0;
×
38
            WBMQTT::JSON::Get(request, "ip", address);
×
39
            WBMQTT::JSON::Get(request, "port", portNumber);
×
40
            TTcpPortSettings settings(address, portNumber);
×
41

42
            LOG(Debug) << "Create tcp port: " << address << ":" << portNumber;
×
43
            return std::make_shared<TTcpPort>(settings);
×
44
        }
45
        throw TRPCException("Port is not defined", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
46
    }
47
}
48

49
//==========================================================
50
//              TSerialClientTaskExecutor
51
//==========================================================
52

53
TSerialClientTaskExecutor::TSerialClientTaskExecutor(PPort port): Port(port), Running(true), Idle(true)
×
54
{
55
    Thread = std::thread([this]() {
×
56
        TSerialClientDeviceAccessHandler lastAccessedDevice(nullptr);
×
57
        while (true) {
58
            std::unique_lock<std::mutex> lock(Mutex);
×
59
            TasksCv.wait(lock, [this]() { return !Tasks.empty() || !Running; });
×
60
            if (!Running) {
×
61
                break;
×
62
            }
63
            if (Tasks.empty()) {
×
64
                continue;
×
65
            }
66
            Idle = false;
×
67
            std::vector<PSerialClientTask> tasksToRun;
×
68
            Tasks.swap(tasksToRun);
×
69
            lock.unlock();
×
70
            for (auto& task: tasksToRun) {
×
71
                try {
72
                    task->Run(Port, lastAccessedDevice, std::list<PSerialDevice>());
×
73
                } catch (const std::exception& e) {
×
74
                    LOG(Error) << "Error while running task: " << e.what();
×
75
                }
76
            }
77
            lock.lock();
×
78
            if (Tasks.empty()) {
×
79
                Idle = true;
×
80
                Port->Close();
×
81
            }
82
        }
83
    });
×
84
}
85

86
TSerialClientTaskExecutor::~TSerialClientTaskExecutor()
×
87
{
88
    Running = false;
×
89
    TasksCv.notify_all();
×
90
    Thread.join();
×
91
}
92

93
void TSerialClientTaskExecutor::AddTask(PSerialClientTask task)
×
94
{
95
    if (!Running) {
×
96
        return;
×
97
    }
98
    {
99
        std::unique_lock<std::mutex> lock(Mutex);
×
100
        Tasks.push_back(task);
×
101
    }
102
    TasksCv.notify_all();
×
103
}
104

105
PPort TSerialClientTaskExecutor::GetPort() const
×
106
{
107
    return Port;
×
108
}
109

110
bool TSerialClientTaskExecutor::IsIdle() const
×
111
{
112
    std::unique_lock<std::mutex> lock(Mutex);
×
113
    return Idle;
×
114
}
115

116
//==========================================================
117
//              TSerialClientTaskRunner
118
//==========================================================
119

120
TSerialClientTaskRunner::TSerialClientTaskRunner(PMQTTSerialDriver serialDriver): SerialDriver(serialDriver)
×
121
{}
122

NEW
123
PSerialClient TSerialClientTaskRunner::GetSerialClient(const Json::Value& request, PSerialDevice& clientDevice)
×
124
{
125
    auto deviceId = request["device_id"];
×
126
    if (deviceId.isString()) {
×
127
        auto id = deviceId.asString();
×
128
        for (auto driver: SerialDriver->GetPortDrivers()) {
×
129
            for (auto device: driver->GetSerialClient()->GetDevices()) {
×
130
                if (device->DeviceConfig()->Id == id) {
×
NEW
131
                    clientDevice = device;
×
NEW
132
                    return driver->GetSerialClient();
×
133
                }
134
            }
135
        }
136
    }
137

138
    auto portDescription = GetRequestPortDescription(request);
×
139
    if (SerialDriver) {
×
140
        auto portDrivers = SerialDriver->GetPortDrivers();
×
141
        auto portDriver =
142
            std::find_if(portDrivers.begin(), portDrivers.end(), [&portDescription](const PSerialPortDriver& driver) {
×
143
                return driver->GetSerialClient()->GetPort()->GetDescription(false) == portDescription;
×
144
            });
×
145
        if (portDriver != portDrivers.end()) {
×
146
            auto slaveId = request["slave_id"].asString();
×
147
            auto deviceType = request["device_type"].asString();
×
148
            for (auto device: (*portDriver)->GetSerialClient()->GetDevices()) {
×
149
                if (device->DeviceConfig()->SlaveId == slaveId && device->DeviceConfig()->DeviceType == deviceType) {
×
NEW
150
                    clientDevice = device;
×
151
                    break;
×
152
                }
153
            }
NEW
154
            return (*portDriver)->GetSerialClient();
×
155
        }
156
    }
157

NEW
158
    return nullptr;
×
159
}
160

NEW
161
PSerialClientTaskExecutor TSerialClientTaskRunner::GetTaskExecutor(const Json::Value& request)
×
162
{
163
    std::unique_lock<std::mutex> lock(TaskExecutorsMutex);
×
NEW
164
    auto portDescription = GetRequestPortDescription(request);
×
165
    auto executor = std::find_if(TaskExecutors.begin(),
166
                                 TaskExecutors.end(),
167
                                 [&portDescription](PSerialClientTaskExecutor executor) {
×
168
                                     return executor->GetPort()->GetDescription(false) == portDescription;
×
169
                                 });
×
170
    if (executor == TaskExecutors.end()) {
×
171
        RemoveUnusedExecutors();
×
172
        auto newExecutor = std::make_shared<TSerialClientTaskExecutor>(InitPort(request));
×
173
        TaskExecutors.push_back(newExecutor);
×
NEW
174
        return newExecutor;
×
175
    }
NEW
176
    return *executor;
×
177
}
178

NEW
179
void TSerialClientTaskRunner::RunTask(const Json::Value& request, PSerialClientTask task)
×
180
{
NEW
181
    PSerialDevice device;
×
NEW
182
    auto serialClient = GetSerialClient(request, device);
×
NEW
183
    if (serialClient) {
×
NEW
184
        serialClient->AddTask(task);
×
NEW
185
        return;
×
186
    }
NEW
187
    GetTaskExecutor(request)->AddTask(task);
×
188
}
189

190
void TSerialClientTaskRunner::RemoveUnusedExecutors()
×
191
{
192
    while (TaskExecutors.size() >= MAX_TASK_EXECUTORS) {
×
193
        auto executorIt = std::find_if(TaskExecutors.begin(),
194
                                       TaskExecutors.end(),
195
                                       [](PSerialClientTaskExecutor executor) { return executor->IsIdle(); });
×
196
        if (executorIt == TaskExecutors.end()) {
×
197
            break;
×
198
        }
199
        TaskExecutors.erase(executorIt);
×
200
    }
201
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc