• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 6

02 Jul 2025 11:08AM UTC coverage: 73.768% (-0.07%) from 73.835%
6

Pull #956

github

u236
update changelog
Pull Request #956: Refactor device/LoadConfig RPC

6427 of 9048 branches covered (71.03%)

0 of 50 new or added lines in 4 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

12278 of 16644 relevant lines covered (73.77%)

306.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/rpc/rpc_port_driver_list.cpp
1
#include "rpc_port_driver_list.h"
2
#include "rpc_helpers.h"
3
#include "serial_port.h"
4
#include "tcp_port.h"
5

6
#define LOG(logger) ::logger.Log() << "[RPC] "
7

8
namespace
9
{
10
    const size_t MAX_TASK_EXECUTORS = 5;
11

12
    std::string GetRequestPortDescription(const Json::Value& request)
×
13
    {
14
        std::string path;
×
15
        if (WBMQTT::JSON::Get(request, "path", path)) {
×
16
            return path;
×
17
        }
18
        int portNumber;
19
        if (WBMQTT::JSON::Get(request, "ip", path) && WBMQTT::JSON::Get(request, "port", portNumber)) {
×
20
            return path + ":" + std::to_string(portNumber);
×
21
        }
22
        throw TRPCException("Port is not defined", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
23
    }
24

25
    PPort InitPort(const Json::Value& request)
×
26
    {
27
        if (request.isMember("path")) {
×
28
            std::string path;
×
29
            WBMQTT::JSON::Get(request, "path", path);
×
30
            TSerialPortSettings settings(path, ParseRPCSerialPortSettings(request));
×
31

32
            LOG(Debug) << "Create serial port: " << path;
×
33
            return std::make_shared<TSerialPort>(settings);
×
34
        }
35
        if (request.isMember("ip") && request.isMember("port")) {
×
36
            std::string address;
×
37
            int portNumber = 0;
×
38
            WBMQTT::JSON::Get(request, "ip", address);
×
39
            WBMQTT::JSON::Get(request, "port", portNumber);
×
40
            TTcpPortSettings settings(address, portNumber);
×
41

42
            LOG(Debug) << "Create tcp port: " << address << ":" << portNumber;
×
43
            return std::make_shared<TTcpPort>(settings);
×
44
        }
45
        throw TRPCException("Port is not defined", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
46
    }
47
}
48

49
//==========================================================
50
//              TSerialClientTaskExecutor
51
//==========================================================
52

53
TSerialClientTaskExecutor::TSerialClientTaskExecutor(PPort port): Port(port), Running(true), Idle(true)
×
54
{
55
    Thread = std::thread([this]() {
×
56
        TSerialClientDeviceAccessHandler lastAccessedDevice(nullptr);
×
57
        while (true) {
58
            std::unique_lock<std::mutex> lock(Mutex);
×
59
            TasksCv.wait(lock, [this]() { return !Tasks.empty() || !Running; });
×
60
            if (!Running) {
×
61
                break;
×
62
            }
63
            if (Tasks.empty()) {
×
64
                continue;
×
65
            }
66
            Idle = false;
×
67
            std::vector<PSerialClientTask> tasksToRun;
×
68
            Tasks.swap(tasksToRun);
×
69
            lock.unlock();
×
70
            for (auto& task: tasksToRun) {
×
71
                try {
72
                    task->Run(Port, lastAccessedDevice, std::list<PSerialDevice>());
×
73
                } catch (const std::exception& e) {
×
74
                    LOG(Error) << "Error while running task: " << e.what();
×
75
                }
76
            }
77
            lock.lock();
×
78
            if (Tasks.empty()) {
×
79
                Idle = true;
×
80
                Port->Close();
×
81
            }
82
        }
83
    });
×
84
}
85

86
TSerialClientTaskExecutor::~TSerialClientTaskExecutor()
×
87
{
88
    Running = false;
×
89
    TasksCv.notify_all();
×
90
    Thread.join();
×
91
}
92

93
void TSerialClientTaskExecutor::AddTask(PSerialClientTask task)
×
94
{
95
    if (!Running) {
×
96
        return;
×
97
    }
98
    {
99
        std::unique_lock<std::mutex> lock(Mutex);
×
100
        Tasks.push_back(task);
×
101
    }
102
    TasksCv.notify_all();
×
103
}
104

105
PPort TSerialClientTaskExecutor::GetPort() const
×
106
{
107
    return Port;
×
108
}
109

110
bool TSerialClientTaskExecutor::IsIdle() const
×
111
{
112
    std::unique_lock<std::mutex> lock(Mutex);
×
113
    return Idle;
×
114
}
115

116
//==========================================================
117
//              TSerialClientTaskRunner
118
//==========================================================
119

120
TSerialClientTaskRunner::TSerialClientTaskRunner(PMQTTSerialDriver serialDriver): SerialDriver(serialDriver)
×
121
{}
122

123
void TSerialClientTaskRunner::RunTask(const Json::Value& request, PSerialClientTask task)
×
124
{
NEW
125
    auto deviceId = request["device_id"];
×
NEW
126
    if (deviceId.isString()) {
×
NEW
127
        auto id = deviceId.asString();
×
NEW
128
        for (auto driver: SerialDriver->GetPortDrivers()) {
×
NEW
129
            for (auto device: driver->GetSerialClient()->GetDevices()) {
×
NEW
130
                if (device->DeviceConfig()->Id == id) {
×
NEW
131
                    task->Device = device;
×
NEW
132
                    driver->GetSerialClient()->AddTask(task);
×
NEW
133
                    return;
×
134
                }
135
            }
136
        }
137
    }
138

139
    auto portDescription = GetRequestPortDescription(request);
×
140
    if (SerialDriver) {
×
141
        auto portDrivers = SerialDriver->GetPortDrivers();
×
142
        auto portDriver =
143
            std::find_if(portDrivers.begin(), portDrivers.end(), [&portDescription](const PSerialPortDriver& driver) {
×
144
                return driver->GetSerialClient()->GetPort()->GetDescription(false) == portDescription;
×
145
            });
×
146
        if (portDriver != portDrivers.end()) {
×
NEW
147
            auto slaveId = request["slave_id"].asString();
×
NEW
148
            auto deviceType = request["device_type"].asString();
×
NEW
149
            for (auto device: (*portDriver)->GetSerialClient()->GetDevices()) {
×
NEW
150
                if (device->DeviceConfig()->SlaveId == slaveId && device->DeviceConfig()->DeviceType == deviceType) {
×
NEW
151
                    task->Device = device;
×
NEW
152
                    break;
×
153
                }
154
            }
155
            (*portDriver)->GetSerialClient()->AddTask(task);
×
156
            return;
×
157
        }
158
    }
159

160
    std::unique_lock<std::mutex> lock(TaskExecutorsMutex);
×
161
    auto executor = std::find_if(TaskExecutors.begin(),
162
                                 TaskExecutors.end(),
163
                                 [&portDescription](PSerialClientTaskExecutor executor) {
×
164
                                     return executor->GetPort()->GetDescription(false) == portDescription;
×
165
                                 });
×
166
    if (executor == TaskExecutors.end()) {
×
167
        RemoveUnusedExecutors();
×
168
        auto newExecutor = std::make_shared<TSerialClientTaskExecutor>(InitPort(request));
×
169
        newExecutor->AddTask(task);
×
170
        TaskExecutors.push_back(newExecutor);
×
171
    } else {
172
        (*executor)->AddTask(task);
×
173
    }
174
}
175

176
void TSerialClientTaskRunner::RemoveUnusedExecutors()
×
177
{
178
    while (TaskExecutors.size() >= MAX_TASK_EXECUTORS) {
×
179
        auto executorIt = std::find_if(TaskExecutors.begin(),
180
                                       TaskExecutors.end(),
181
                                       [](PSerialClientTaskExecutor executor) { return executor->IsIdle(); });
×
182
        if (executorIt == TaskExecutors.end()) {
×
183
            break;
×
184
        }
185
        TaskExecutors.erase(executorIt);
×
186
    }
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc