• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 707

06 Oct 2025 09:51AM UTC coverage: 73.346% (-0.05%) from 73.391%
707

push

github

web-flow
Add support for WB-MGE v.3 (#1002)

* Add protocol parameter to Scan RPC.
* Add WB-MGE v.3 detection.
* Enable Fast Modbus events for devices connected through WB-MGE v.3 in Modbus TCP gateway mode.

6750 of 9574 branches covered (70.5%)

321 of 399 new or added lines in 22 files covered. (80.45%)

5 existing lines in 4 files now uncovered.

12796 of 17446 relevant lines covered (73.35%)

410.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.14
/src/serial_client.cpp
1
#include "serial_client.h"
2
#include <chrono>
3
#include <iostream>
4
#include <unistd.h>
5

6
#include "modbus_ext_common.h"
7
#include "write_channel_serial_client_task.h"
8

9
using namespace std::chrono_literals;
10
using namespace std::chrono;
11

12
#define LOG(logger) logger.Log() << "[serial client] "
13

14
namespace
15
{
16
    const auto PORT_OPEN_ERROR_NOTIFICATION_INTERVAL = 5min;
17
    const auto CLOSED_PORT_CYCLE_TIME = 500ms;
18
    const auto MAX_POLL_TIME = 100ms;
19
    // const auto MAX_FLUSHES_WHEN_POLL_IS_DUE = 20;
20
    const auto BALANCING_THRESHOLD = 500ms;
21
    // const auto MIN_READ_EVENTS_TIME = 25ms;
22
    const size_t MAX_EVENT_READ_ERRORS = 10;
23

24
    std::chrono::milliseconds GetReadEventsPeriod(const TPort& port)
79✔
25
    {
26
        auto sendByteTime = port.GetSendTimeBytes(1);
79✔
27
        // >= 115200
28
        if (sendByteTime < 100us) {
79✔
29
            return 50ms;
2✔
30
        }
31
        // >= 38400
32
        if (sendByteTime < 300us) {
77✔
33
            return 100ms;
×
34
        }
35
        // < 38400
36
        return 200ms;
77✔
37
    }
38
};
39

40
TSerialClient::TSerialClient(PFeaturePort port,
99✔
41
                             const TPortOpenCloseLogic::TSettings& openCloseSettings,
42
                             util::TGetNowFn nowFn,
43
                             size_t lowPriorityRateLimit)
99✔
44
    : Port(port),
45
      OpenCloseLogic(openCloseSettings, nowFn),
46
      ConnectLogger(PORT_OPEN_ERROR_NOTIFICATION_INTERVAL, "[serial client] "),
47
      NowFn(nowFn),
48
      LowPriorityRateLimit(lowPriorityRateLimit)
99✔
49
{}
99✔
50

51
TSerialClient::~TSerialClient()
99✔
52
{
53
    if (Port->IsOpen()) {
99✔
54
        Port->Close();
52✔
55
    }
56
}
99✔
57

58
void TSerialClient::AddDevice(PSerialDevice device)
85✔
59
{
60
    if (RegReader)
85✔
61
        throw TSerialDeviceException("can't add registers to the active client");
×
62
    for (const auto& reg: device->GetRegisters()) {
619✔
63
        if (Handlers.find(reg) != Handlers.end())
534✔
64
            throw TSerialDeviceException("duplicate register");
×
65
        auto handler = Handlers[reg] = std::make_shared<TRegisterHandler>(reg);
534✔
66
        RegList.push_back(reg);
534✔
67
        LOG(Debug) << "AddRegister: " << reg;
534✔
68
    }
69
    Devices.push_back(device);
85✔
70
}
85✔
71

72
void TSerialClient::Activate()
620✔
73
{
74
    if (!RegReader) {
620✔
75
        RegReader = std::make_unique<TSerialClientRegisterAndEventsReader>(Devices,
79✔
76
                                                                           GetReadEventsPeriod(*Port),
79✔
77
                                                                           NowFn,
79✔
78
                                                                           LowPriorityRateLimit);
79✔
79
        LastAccessedDevice = std::make_unique<TSerialClientDeviceAccessHandler>(RegReader->GetEventsReader());
79✔
80
    }
81
}
620✔
82

83
void TSerialClient::WaitForPollAndFlush(steady_clock::time_point currentTime, steady_clock::time_point waitUntil)
620✔
84
{
85
    if (currentTime > waitUntil) {
620✔
86
        waitUntil = currentTime;
609✔
87
    }
88

89
    if (Debug.IsEnabled()) {
620✔
90
        LOG(Debug) << Port->GetDescription() << duration_cast<milliseconds>(currentTime.time_since_epoch()).count()
×
91
                   << ": Wait until " << duration_cast<milliseconds>(waitUntil.time_since_epoch()).count();
×
92
    }
93

94
    std::vector<PSerialClientTask> retryTasks;
1,240✔
95
    {
96
        std::unique_lock<std::mutex> lock(TasksMutex);
1,240✔
97

98
        while (TasksCv.wait_until(lock, waitUntil, [this]() { return !Tasks.empty(); })) {
1,978✔
99
            std::vector<PSerialClientTask> tasks;
118✔
100
            Tasks.swap(tasks);
59✔
101
            lock.unlock();
59✔
102
            for (auto& task: tasks) {
184✔
103
                if (task->Run(Port, *LastAccessedDevice, Devices) == ISerialClientTask::TRunResult::RETRY) {
125✔
104
                    retryTasks.push_back(task);
8✔
105
                }
106
            }
107
            lock.lock();
59✔
108
        }
109
    }
110
    for (auto& task: retryTasks) {
628✔
111
        AddTask(task);
8✔
112
    }
113
}
620✔
114

115
void TSerialClient::ProcessPolledRegister(PRegister reg)
1,047✔
116
{
117
    if (reg->GetErrorState().test(TRegister::ReadError) || reg->GetErrorState().test(TRegister::WriteError)) {
1,047✔
118
        if (RegisterErrorCallback) {
151✔
119
            RegisterErrorCallback(reg);
151✔
120
        }
121
    } else {
122
        if (RegisterReadCallback) {
896✔
123
            RegisterReadCallback(reg);
896✔
124
        }
125
    }
126
}
1,047✔
127

128
void TSerialClient::Cycle()
620✔
129
{
130
    Activate();
620✔
131

132
    try {
133
        OpenCloseLogic.OpenIfAllowed(Port);
623✔
134
    } catch (const std::exception& e) {
6✔
135
        ConnectLogger.Log(e.what(), Debug, Error);
3✔
136
    }
137

138
    if (Port->IsOpen()) {
620✔
139
        ConnectLogger.DropTimeout();
616✔
140
        OpenPortCycle();
616✔
141
    } else {
142
        ClosedPortCycle();
4✔
143
    }
144
}
620✔
145

146
void TSerialClient::ClosedPortCycle()
4✔
147
{
148
    auto currentTime = NowFn();
4✔
149
    auto waitUntil = currentTime + CLOSED_PORT_CYCLE_TIME;
4✔
150
    WaitForPollAndFlush(currentTime, waitUntil);
4✔
151

152
    RegReader->ClosedPortCycle(waitUntil, [this](PRegister reg) { ProcessPolledRegister(reg); });
12✔
153
}
4✔
154

155
void TSerialClient::SetTextValue(PRegister reg, const std::string& value)
115✔
156
{
157
    auto handler = GetHandler(reg);
230✔
158
    handler->SetTextValue(value);
115✔
159
    auto serialClientTask =
160
        std::make_shared<TWriteChannelSerialClientTask>(handler, RegisterReadCallback, RegisterErrorCallback);
115✔
161
    AddTask(serialClientTask);
115✔
162
}
115✔
163

164
void TSerialClient::SetReadCallback(const TSerialClient::TRegisterCallback& callback)
99✔
165
{
166
    RegisterReadCallback = callback;
99✔
167
}
99✔
168

169
void TSerialClient::SetErrorCallback(const TSerialClient::TRegisterCallback& callback)
99✔
170
{
171
    RegisterErrorCallback = callback;
99✔
172
}
99✔
173

174
PRegisterHandler TSerialClient::GetHandler(PRegister reg) const
115✔
175
{
176
    auto it = Handlers.find(reg);
115✔
177
    if (it == Handlers.end())
115✔
178
        throw TSerialDeviceException("register not found");
×
179
    return it->second;
230✔
180
}
181

182
void TSerialClient::OpenPortCycle()
616✔
183
{
184
    auto currentTime = NowFn();
616✔
185
    auto waitUntil = RegReader->GetDeadline(currentTime);
616✔
186
    // Limit waiting time to be responsive
187
    waitUntil = std::min(waitUntil, currentTime + MAX_POLL_TIME);
616✔
188
    WaitForPollAndFlush(currentTime, waitUntil);
616✔
189

190
    auto device = RegReader->OpenPortCycle(
191
        *Port,
616✔
192
        [this](PRegister reg) { ProcessPolledRegister(reg); },
1,039✔
193
        *LastAccessedDevice);
1,848✔
194

195
    if (device) {
616✔
196
        OpenCloseLogic.CloseIfNeeded(Port, device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED);
610✔
197
    }
198
}
616✔
199

NEW
200
PFeaturePort TSerialClient::GetPort()
×
201
{
202
    return Port;
×
203
}
204

205
std::list<PSerialDevice> TSerialClient::GetDevices()
1✔
206
{
207
    return Devices;
1✔
208
}
209

210
void TSerialClient::AddTask(PSerialClientTask task)
126✔
211
{
212
    {
213
        std::unique_lock<std::mutex> lock(TasksMutex);
252✔
214
        Tasks.push_back(task);
126✔
215
    }
216
    TasksCv.notify_all();
126✔
217
}
126✔
218

219
void TSerialClient::SuspendPoll(PSerialDevice device, std::chrono::steady_clock::time_point currentTime)
×
220
{
221
    RegReader->SuspendPoll(device, currentTime);
×
222
}
223

224
void TSerialClient::ResumePoll(PSerialDevice device)
×
225
{
226
    RegReader->ResumePoll(device);
×
227
}
228

229
TSerialClientRegisterAndEventsReader::TSerialClientRegisterAndEventsReader(const std::list<PSerialDevice>& devices,
96✔
230
                                                                           std::chrono::milliseconds readEventsPeriod,
231
                                                                           util::TGetNowFn nowFn,
232
                                                                           size_t lowPriorityRateLimit)
96✔
233
    : EventsReader(std::make_shared<TSerialClientEventsReader>(MAX_EVENT_READ_ERRORS)),
234
      RegisterPoller(lowPriorityRateLimit),
235
      TimeBalancer(BALANCING_THRESHOLD),
236
      ReadEventsPeriod(readEventsPeriod),
237
      SpentTime(nowFn),
238
      LastCycleWasTooSmallToPoll(false),
239
      NowFn(nowFn)
96✔
240
{
241
    auto currentTime = NowFn();
96✔
242
    RegisterPoller.SetDevices(devices, currentTime);
96✔
243
    EventsReader->SetDevices(devices);
96✔
244
    TimeBalancer.AddEntry(TClientTaskType::POLLING, currentTime, TPriority::Low);
96✔
245
}
96✔
246

247
void TSerialClientRegisterAndEventsReader::ClosedPortCycle(std::chrono::steady_clock::time_point currentTime,
4✔
248
                                                           TRegisterCallback regCallback)
249
{
250
    EventsReader->SetReadErrors(regCallback);
4✔
251
    RegisterPoller.ClosedPortCycle(currentTime, regCallback);
4✔
252
}
4✔
253

254
class TSerialClientTaskHandler
255
{
256
public:
257
    TClientTaskType TaskType;
258
    TItemAccumulationPolicy Policy;
259
    milliseconds PollLimit;
260
    bool NotReady = true;
261

262
    bool operator()(TClientTaskType task, TItemAccumulationPolicy policy, milliseconds pollLimit)
894✔
263
    {
264
        PollLimit = pollLimit;
894✔
265
        TaskType = task;
894✔
266
        Policy = policy;
894✔
267
        NotReady = false;
894✔
268
        return true;
894✔
269
    }
270
};
271

272
PSerialDevice TSerialClientRegisterAndEventsReader::OpenPortCycle(TFeaturePort& port,
900✔
273
                                                                  TRegisterCallback regCallback,
274
                                                                  TSerialClientDeviceAccessHandler& lastAccessedDevice)
275
{
276
    // Count idle time as high priority task time to faster reach time balancing threshold
277
    if (LastCycleWasTooSmallToPoll) {
900✔
278
        TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::High);
58✔
279
    }
280

281
    SpentTime.Start();
900✔
282
    TSerialClientTaskHandler handler;
900✔
283
    TimeBalancer.AccumulateNext(SpentTime.GetStartTime(), handler, TItemSelectionPolicy::All);
900✔
284
    if (handler.NotReady) {
900✔
285
        return nullptr;
6✔
286
    }
287

288
    if (handler.TaskType == TClientTaskType::EVENTS) {
894✔
289
        if (EventsReader && EventsReader->HasDevicesWithEnabledEvents()) {
84✔
290
            lastAccessedDevice.PrepareToAccess(port, nullptr);
81✔
291
            EventsReader->ReadEvents(port, MAX_POLL_TIME, regCallback, NowFn);
81✔
292
            TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::High);
81✔
293
            TimeBalancer.AddEntry(TClientTaskType::EVENTS,
81✔
294
                                  SpentTime.GetStartTime() + ReadEventsPeriod,
162✔
295
                                  TPriority::High);
296
        }
297
        SpentTime.Start();
84✔
298

299
        // TODO: Need to notify port open/close logic about errors
300
        return nullptr;
84✔
301
    }
302

303
    // Some registers can have theoretical read time more than poll limit.
304
    // Define special cases when reading can exceed poll limit to read the registers:
305
    // 1. TimeBalancer can force reading of such registers.
306
    // 2. If there are not devices with enabled events, the only limiting timeout is MAX_POLL_TIME.
307
    //    We can miss it and read at least one register.
308
    const bool readAtLeastOneRegister =
309
        (handler.Policy == TItemAccumulationPolicy::Force) || !EventsReader->HasDevicesWithEnabledEvents();
810✔
310

311
    auto res = RegisterPoller.OpenPortCycle(port,
312
                                            SpentTime,
810✔
313
                                            std::min(handler.PollLimit, MAX_POLL_TIME),
1,620✔
314
                                            readAtLeastOneRegister,
315
                                            lastAccessedDevice,
316
                                            regCallback);
1,620✔
317

318
    TimeBalancer.AddEntry(TClientTaskType::POLLING, res.Deadline, TPriority::Low);
810✔
319
    if (res.NotEnoughTime) {
810✔
320
        LastCycleWasTooSmallToPoll = true;
37✔
321
    } else {
322
        LastCycleWasTooSmallToPoll = false;
773✔
323
        TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::Low);
773✔
324
    }
325

326
    if (EventsReader->HasDevicesWithEnabledEvents() && !TimeBalancer.Contains(TClientTaskType::EVENTS)) {
810✔
327
        TimeBalancer.AddEntry(TClientTaskType::EVENTS, SpentTime.GetStartTime() + ReadEventsPeriod, TPriority::High);
12✔
328
    }
329

330
    SpentTime.Start();
810✔
331
    return res.Device;
810✔
332
}
333

334
std::chrono::steady_clock::time_point TSerialClientRegisterAndEventsReader::GetDeadline(
900✔
335
    std::chrono::steady_clock::time_point currentTime) const
336
{
337
    return TimeBalancer.GetDeadline();
900✔
338
}
339

340
PSerialClientEventsReader TSerialClientRegisterAndEventsReader::GetEventsReader() const
96✔
341
{
342
    return EventsReader;
96✔
343
}
344

345
void TSerialClientRegisterAndEventsReader::SuspendPoll(PSerialDevice device,
3✔
346
                                                       std::chrono::steady_clock::time_point currentTime)
347
{
348
    RegisterPoller.SuspendPoll(device, currentTime);
3✔
349
}
3✔
350

351
void TSerialClientRegisterAndEventsReader::ResumePoll(PSerialDevice device)
2✔
352
{
353
    RegisterPoller.ResumePoll(device);
2✔
354
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc