• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 707

06 Oct 2025 09:51AM UTC coverage: 73.346% (-0.05%) from 73.391%
707

push

github

web-flow
Add support for WB-MGE v.3 (#1002)

* Add protocol parameter to Scan RPC.
* Add WB-MGE v.3 detection.
* Enable Fast Modbus events for devices connected through WB-MGE v.3 in Modbus TCP gateway mode.

6750 of 9574 branches covered (70.5%)

321 of 399 new or added lines in 22 files covered. (80.45%)

5 existing lines in 4 files now uncovered.

12796 of 17446 relevant lines covered (73.35%)

410.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/rpc/rpc_port_driver_list.cpp
1
#include "rpc_port_driver_list.h"
2
#include "port/serial_port.h"
3
#include "port/tcp_port.h"
4
#include "rpc_helpers.h"
5

6
#define LOG(logger) ::logger.Log() << "[RPC] "
7

8
namespace
9
{
10
    const size_t MAX_TASK_EXECUTORS = 5;
11

12
    std::string GetRequestPortDescription(const Json::Value& request)
×
13
    {
14
        std::string path;
×
15
        if (WBMQTT::JSON::Get(request, "path", path)) {
×
16
            return path;
×
17
        }
18
        int portNumber;
19
        if (WBMQTT::JSON::Get(request, "ip", path) && WBMQTT::JSON::Get(request, "port", portNumber)) {
×
20
            return path + ":" + std::to_string(portNumber);
×
21
        }
22
        throw TRPCException("Port is not defined", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
23
    }
24

NEW
25
    PFeaturePort InitPort(const Json::Value& request)
×
26
    {
27
        if (request.isMember("path")) {
×
28
            std::string path;
×
29
            WBMQTT::JSON::Get(request, "path", path);
×
30
            TSerialPortSettings settings(path, ParseRPCSerialPortSettings(request));
×
31

32
            LOG(Debug) << "Create serial port: " << path;
×
NEW
33
            return std::make_shared<TFeaturePort>(std::make_shared<TSerialPort>(settings), false);
×
34
        }
35
        if (request.isMember("ip") && request.isMember("port")) {
×
36
            std::string address;
×
37
            int portNumber = 0;
×
38
            WBMQTT::JSON::Get(request, "ip", address);
×
39
            WBMQTT::JSON::Get(request, "port", portNumber);
×
40
            TTcpPortSettings settings(address, portNumber);
×
41

42
            LOG(Debug) << "Create tcp port: " << address << ":" << portNumber;
×
NEW
43
            bool isModbusTcp = request.get("protocol", "modbus").asString() == "modbus-tcp";
×
NEW
44
            return std::make_shared<TFeaturePort>(std::make_shared<TTcpPort>(settings), isModbusTcp);
×
45
        }
46
        throw TRPCException("Port is not defined", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
47
    }
48
}
49

50
//==========================================================
51
//              TSerialClientTaskExecutor
52
//==========================================================
53

NEW
54
TSerialClientTaskExecutor::TSerialClientTaskExecutor(PFeaturePort port): Port(port), Running(true), Idle(true)
×
55
{
56
    Thread = std::thread([this]() {
×
57
        TSerialClientDeviceAccessHandler lastAccessedDevice(nullptr);
×
58
        while (true) {
59
            std::unique_lock<std::mutex> lock(Mutex);
×
60
            TasksCv.wait(lock, [this]() { return !Tasks.empty() || !Running; });
×
61
            if (!Running) {
×
62
                break;
×
63
            }
64
            if (Tasks.empty()) {
×
65
                continue;
×
66
            }
67
            Idle = false;
×
68
            std::vector<PSerialClientTask> tasksToRun;
×
69
            Tasks.swap(tasksToRun);
×
70
            lock.unlock();
×
71
            for (auto& task: tasksToRun) {
×
72
                try {
73
                    task->Run(Port, lastAccessedDevice, std::list<PSerialDevice>());
×
74
                } catch (const std::exception& e) {
×
75
                    LOG(Error) << "Error while running task: " << e.what();
×
76
                }
77
            }
78
            lock.lock();
×
79
            if (Tasks.empty()) {
×
80
                Idle = true;
×
81
                Port->Close();
×
82
            }
83
        }
84
    });
×
85
}
86

87
TSerialClientTaskExecutor::~TSerialClientTaskExecutor()
×
88
{
89
    Running = false;
×
90
    TasksCv.notify_all();
×
91
    Thread.join();
×
92
}
93

94
void TSerialClientTaskExecutor::AddTask(PSerialClientTask task)
×
95
{
96
    if (!Running) {
×
97
        return;
×
98
    }
99
    {
100
        std::unique_lock<std::mutex> lock(Mutex);
×
101
        Tasks.push_back(task);
×
102
    }
103
    TasksCv.notify_all();
×
104
}
105

NEW
106
PFeaturePort TSerialClientTaskExecutor::GetPort() const
×
107
{
108
    return Port;
×
109
}
110

111
bool TSerialClientTaskExecutor::IsIdle() const
×
112
{
113
    std::unique_lock<std::mutex> lock(Mutex);
×
114
    return Idle;
×
115
}
116

117
//==========================================================
118
//              TSerialClientTaskRunner
119
//==========================================================
120

121
TSerialClientTaskRunner::TSerialClientTaskRunner(PMQTTSerialDriver serialDriver): SerialDriver(serialDriver)
×
122
{}
123

124
TSerialClientParams TSerialClientTaskRunner::GetSerialClientParams(const Json::Value& request)
×
125
{
126
    TSerialClientParams params;
×
127

128
    auto deviceId = request["device_id"];
×
129
    if (deviceId.isString()) {
×
130
        auto id = deviceId.asString();
×
131
        for (auto driver: SerialDriver->GetPortDrivers()) {
×
132
            for (auto device: driver->GetSerialClient()->GetDevices()) {
×
133
                if (device->DeviceConfig()->Id == id) {
×
134
                    params.SerialClient = driver->GetSerialClient();
×
135
                    params.Device = device;
×
136
                    return params;
×
137
                }
138
            }
139
        }
140
    }
141

142
    auto portDescription = GetRequestPortDescription(request);
×
143
    if (SerialDriver) {
×
144
        auto portDrivers = SerialDriver->GetPortDrivers();
×
145
        auto portDriver =
146
            std::find_if(portDrivers.begin(), portDrivers.end(), [&portDescription](const PSerialPortDriver& driver) {
×
147
                return driver->GetSerialClient()->GetPort()->GetDescription(false) == portDescription;
×
148
            });
×
149
        if (portDriver != portDrivers.end()) {
×
150
            params.SerialClient = (*portDriver)->GetSerialClient();
×
151
            auto slaveId = request["slave_id"].asString();
×
152
            auto deviceType = request["device_type"].asString();
×
153
            for (auto device: (*portDriver)->GetSerialClient()->GetDevices()) {
×
154
                if (device->DeviceConfig()->SlaveId == slaveId &&
×
155
                    (device->DeviceConfig()->DeviceType == deviceType ||
×
156
                     (deviceType.empty() && WBMQTT::StringStartsWith(device->Protocol()->GetName(), "modbus"))))
×
157
                {
158
                    params.Device = device;
×
159
                    break;
×
160
                }
161
            }
162
        }
163
    }
164

165
    return params;
×
166
}
167

168
void TSerialClientTaskRunner::RunTask(const Json::Value& request, PSerialClientTask task)
×
169
{
170
    auto params = GetSerialClientParams(request);
×
171
    if (params.SerialClient) {
×
172
        params.SerialClient->AddTask(task);
×
173
        return;
×
174
    }
175

176
    std::unique_lock<std::mutex> lock(TaskExecutorsMutex);
×
177
    auto portDescription = GetRequestPortDescription(request);
×
178
    auto executor = std::find_if(TaskExecutors.begin(),
179
                                 TaskExecutors.end(),
180
                                 [&portDescription](PSerialClientTaskExecutor executor) {
×
181
                                     return executor->GetPort()->GetDescription(false) == portDescription;
×
182
                                 });
×
183
    if (executor == TaskExecutors.end()) {
×
184
        RemoveUnusedExecutors();
×
185
        auto newExecutor = std::make_shared<TSerialClientTaskExecutor>(InitPort(request));
×
186
        TaskExecutors.push_back(newExecutor);
×
187
        newExecutor->AddTask(task);
×
188
    } else {
189
        return (*executor)->AddTask(task);
×
190
    }
191
}
192

193
void TSerialClientTaskRunner::RemoveUnusedExecutors()
×
194
{
195
    while (TaskExecutors.size() >= MAX_TASK_EXECUTORS) {
×
196
        auto executorIt = std::find_if(TaskExecutors.begin(),
197
                                       TaskExecutors.end(),
198
                                       [](PSerialClientTaskExecutor executor) { return executor->IsIdle(); });
×
199
        if (executorIt == TaskExecutors.end()) {
×
200
            break;
×
201
        }
202
        TaskExecutors.erase(executorIt);
×
203
    }
204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc