• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 2

29 Dec 2025 12:28PM UTC coverage: 76.817% (+4.0%) from 72.836%
2

Pull #1045

github

54aa0c
pgasheev
up changelog
Pull Request #1045: Fix firmware version in WB-M1W2 template

6873 of 9161 branches covered (75.02%)

12966 of 16879 relevant lines covered (76.82%)

1651.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.14
/src/serial_client.cpp
1
#include "serial_client.h"
2
#include <chrono>
3
#include <iostream>
4
#include <unistd.h>
5

6
#include "modbus_ext_common.h"
7
#include "write_channel_serial_client_task.h"
8

9
using namespace std::chrono_literals;
10
using namespace std::chrono;
11

12
#define LOG(logger) logger.Log() << "[serial client] "
13

14
namespace
15
{
16
    const auto PORT_OPEN_ERROR_NOTIFICATION_INTERVAL = 5min;
17
    const auto CLOSED_PORT_CYCLE_TIME = 500ms;
18
    const auto MAX_POLL_TIME = 100ms;
19
    // const auto MAX_FLUSHES_WHEN_POLL_IS_DUE = 20;
20
    const auto BALANCING_THRESHOLD = 500ms;
21
    // const auto MIN_READ_EVENTS_TIME = 25ms;
22
    const size_t MAX_EVENT_READ_ERRORS = 10;
23

24
    std::chrono::milliseconds GetReadEventsPeriod(const TPort& port)
158✔
25
    {
26
        auto sendByteTime = port.GetSendTimeBytes(1);
158✔
27
        // >= 115200
28
        if (sendByteTime < 100us) {
158✔
29
            return 50ms;
4✔
30
        }
31
        // >= 38400
32
        if (sendByteTime < 300us) {
154✔
33
            return 100ms;
×
34
        }
35
        // < 38400
36
        return 200ms;
154✔
37
    }
38
};
39

40
TSerialClient::TSerialClient(PFeaturePort port,
198✔
41
                             const TPortOpenCloseLogic::TSettings& openCloseSettings,
42
                             util::TGetNowFn nowFn,
43
                             size_t lowPriorityRateLimit)
198✔
44
    : Port(port),
45
      OpenCloseLogic(openCloseSettings, nowFn),
46
      ConnectLogger(PORT_OPEN_ERROR_NOTIFICATION_INTERVAL, "[serial client] "),
47
      NowFn(nowFn),
48
      LowPriorityRateLimit(lowPriorityRateLimit)
198✔
49
{}
198✔
50

51
TSerialClient::~TSerialClient()
198✔
52
{
53
    if (Port->IsOpen()) {
198✔
54
        Port->Close();
104✔
55
    }
56
}
198✔
57

58
void TSerialClient::AddDevice(PSerialDevice device)
170✔
59
{
60
    if (RegReader)
170✔
61
        throw TSerialDeviceException("can't add registers to the active client");
×
62
    for (const auto& reg: device->GetRegisters()) {
1,238✔
63
        if (Handlers.find(reg) != Handlers.end())
1,068✔
64
            throw TSerialDeviceException("duplicate register");
×
65
        auto handler = Handlers[reg] = std::make_shared<TRegisterHandler>(reg);
1,068✔
66
        RegList.push_back(reg);
1,068✔
67
        LOG(Debug) << "AddRegister: " << reg;
1,068✔
68
    }
69
    Devices.push_back(device);
170✔
70
}
170✔
71

72
void TSerialClient::Activate()
1,240✔
73
{
74
    if (!RegReader) {
1,240✔
75
        RegReader = std::make_unique<TSerialClientRegisterAndEventsReader>(Devices,
158✔
76
                                                                           GetReadEventsPeriod(*Port),
158✔
77
                                                                           NowFn,
158✔
78
                                                                           LowPriorityRateLimit);
158✔
79
        LastAccessedDevice = std::make_unique<TSerialClientDeviceAccessHandler>(RegReader->GetEventsReader());
158✔
80
    }
81
}
1,240✔
82

83
void TSerialClient::WaitForPollAndFlush(steady_clock::time_point currentTime, steady_clock::time_point waitUntil)
1,240✔
84
{
85
    if (currentTime > waitUntil) {
1,240✔
86
        waitUntil = currentTime;
1,218✔
87
    }
88

89
    if (Debug.IsEnabled()) {
1,240✔
90
        LOG(Debug) << Port->GetDescription() << duration_cast<milliseconds>(currentTime.time_since_epoch()).count()
×
91
                   << ": Wait until " << duration_cast<milliseconds>(waitUntil.time_since_epoch()).count();
×
92
    }
93

94
    std::vector<PSerialClientTask> retryTasks;
2,480✔
95
    {
96
        std::unique_lock<std::mutex> lock(TasksMutex);
2,480✔
97

98
        while (TasksCv.wait_until(lock, waitUntil, [this]() { return !Tasks.empty(); })) {
3,956✔
99
            std::vector<PSerialClientTask> tasks;
236✔
100
            Tasks.swap(tasks);
118✔
101
            lock.unlock();
118✔
102
            for (auto& task: tasks) {
368✔
103
                if (task->Run(Port, *LastAccessedDevice, Devices) == ISerialClientTask::TRunResult::RETRY) {
250✔
104
                    retryTasks.push_back(task);
16✔
105
                }
106
            }
107
            lock.lock();
118✔
108
        }
109
    }
110
    for (auto& task: retryTasks) {
1,256✔
111
        AddTask(task);
16✔
112
    }
113
}
1,240✔
114

115
void TSerialClient::ProcessPolledRegister(PRegister reg)
2,094✔
116
{
117
    if (reg->GetErrorState().test(TRegister::ReadError) || reg->GetErrorState().test(TRegister::WriteError)) {
2,094✔
118
        if (RegisterErrorCallback) {
302✔
119
            RegisterErrorCallback(reg);
302✔
120
        }
121
    } else {
122
        if (RegisterReadCallback) {
1,792✔
123
            RegisterReadCallback(reg);
1,792✔
124
        }
125
    }
126
}
2,094✔
127

128
void TSerialClient::Cycle()
1,240✔
129
{
130
    Activate();
1,240✔
131

132
    try {
133
        OpenCloseLogic.OpenIfAllowed(Port);
1,246✔
134
    } catch (const std::exception& e) {
12✔
135
        ConnectLogger.Log(e.what(), Debug, Error);
6✔
136
    }
137

138
    if (Port->IsOpen()) {
1,240✔
139
        ConnectLogger.DropTimeout();
1,232✔
140
        OpenPortCycle();
1,232✔
141
    } else {
142
        ClosedPortCycle();
8✔
143
    }
144
}
1,240✔
145

146
void TSerialClient::ClosedPortCycle()
8✔
147
{
148
    auto currentTime = NowFn();
8✔
149
    auto waitUntil = currentTime + CLOSED_PORT_CYCLE_TIME;
8✔
150
    WaitForPollAndFlush(currentTime, waitUntil);
8✔
151

152
    RegReader->ClosedPortCycle(waitUntil, [this](PRegister reg) { ProcessPolledRegister(reg); });
24✔
153
}
8✔
154

155
void TSerialClient::SetTextValue(PRegister reg, const std::string& value)
230✔
156
{
157
    auto handler = GetHandler(reg);
460✔
158
    handler->SetTextValue(value);
230✔
159
    auto serialClientTask =
160
        std::make_shared<TWriteChannelSerialClientTask>(handler, RegisterReadCallback, RegisterErrorCallback);
230✔
161
    AddTask(serialClientTask);
230✔
162
}
230✔
163

164
void TSerialClient::SetReadCallback(const TSerialClient::TRegisterCallback& callback)
198✔
165
{
166
    RegisterReadCallback = callback;
198✔
167
}
198✔
168

169
void TSerialClient::SetErrorCallback(const TSerialClient::TRegisterCallback& callback)
198✔
170
{
171
    RegisterErrorCallback = callback;
198✔
172
}
198✔
173

174
PRegisterHandler TSerialClient::GetHandler(PRegister reg) const
230✔
175
{
176
    auto it = Handlers.find(reg);
230✔
177
    if (it == Handlers.end())
230✔
178
        throw TSerialDeviceException("register not found");
×
179
    return it->second;
460✔
180
}
181

182
void TSerialClient::OpenPortCycle()
1,232✔
183
{
184
    auto currentTime = NowFn();
1,232✔
185
    auto waitUntil = RegReader->GetDeadline(currentTime);
1,232✔
186
    // Limit waiting time to be responsive
187
    waitUntil = std::min(waitUntil, currentTime + MAX_POLL_TIME);
1,232✔
188
    WaitForPollAndFlush(currentTime, waitUntil);
1,232✔
189

190
    auto device = RegReader->OpenPortCycle(
191
        *Port,
1,232✔
192
        [this](PRegister reg) { ProcessPolledRegister(reg); },
2,078✔
193
        *LastAccessedDevice);
3,696✔
194

195
    if (device) {
1,232✔
196
        OpenCloseLogic.CloseIfNeeded(Port, device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED);
1,220✔
197
    }
198
}
1,232✔
199

200
PFeaturePort TSerialClient::GetPort()
×
201
{
202
    return Port;
×
203
}
204

205
std::list<PSerialDevice> TSerialClient::GetDevices()
2✔
206
{
207
    return Devices;
2✔
208
}
209

210
void TSerialClient::AddTask(PSerialClientTask task)
252✔
211
{
212
    {
213
        std::unique_lock<std::mutex> lock(TasksMutex);
504✔
214
        Tasks.push_back(task);
252✔
215
    }
216
    TasksCv.notify_all();
252✔
217
}
252✔
218

219
void TSerialClient::SuspendPoll(PSerialDevice device, std::chrono::steady_clock::time_point currentTime)
×
220
{
221
    RegReader->SuspendPoll(device, currentTime);
×
222
}
223

224
void TSerialClient::ResumePoll(PSerialDevice device)
×
225
{
226
    RegReader->ResumePoll(device);
×
227
}
228

229
TSerialClientRegisterAndEventsReader::TSerialClientRegisterAndEventsReader(const std::list<PSerialDevice>& devices,
192✔
230
                                                                           std::chrono::milliseconds readEventsPeriod,
231
                                                                           util::TGetNowFn nowFn,
232
                                                                           size_t lowPriorityRateLimit)
192✔
233
    : EventsReader(std::make_shared<TSerialClientEventsReader>(MAX_EVENT_READ_ERRORS)),
234
      RegisterPoller(lowPriorityRateLimit),
235
      TimeBalancer(BALANCING_THRESHOLD),
236
      ReadEventsPeriod(readEventsPeriod),
237
      SpentTime(nowFn),
238
      LastCycleWasTooSmallToPoll(false),
239
      NowFn(nowFn)
192✔
240
{
241
    auto currentTime = NowFn();
192✔
242
    RegisterPoller.SetDevices(devices, currentTime);
192✔
243
    EventsReader->SetDevices(devices);
192✔
244
    TimeBalancer.AddEntry(TClientTaskType::POLLING, currentTime, TPriority::Low);
192✔
245
}
192✔
246

247
void TSerialClientRegisterAndEventsReader::ClosedPortCycle(std::chrono::steady_clock::time_point currentTime,
8✔
248
                                                           TRegisterCallback regCallback)
249
{
250
    EventsReader->SetReadErrors(regCallback);
8✔
251
    RegisterPoller.ClosedPortCycle(currentTime, regCallback);
8✔
252
}
8✔
253

254
class TSerialClientTaskHandler
255
{
256
public:
257
    TClientTaskType TaskType;
258
    TItemAccumulationPolicy Policy;
259
    milliseconds PollLimit;
260
    bool NotReady = true;
261

262
    bool operator()(TClientTaskType task, TItemAccumulationPolicy policy, milliseconds pollLimit)
1,788✔
263
    {
264
        PollLimit = pollLimit;
1,788✔
265
        TaskType = task;
1,788✔
266
        Policy = policy;
1,788✔
267
        NotReady = false;
1,788✔
268
        return true;
1,788✔
269
    }
270
};
271

272
PSerialDevice TSerialClientRegisterAndEventsReader::OpenPortCycle(TFeaturePort& port,
1,800✔
273
                                                                  TRegisterCallback regCallback,
274
                                                                  TSerialClientDeviceAccessHandler& lastAccessedDevice)
275
{
276
    // Count idle time as high priority task time to faster reach time balancing threshold
277
    if (LastCycleWasTooSmallToPoll) {
1,800✔
278
        TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::High);
116✔
279
    }
280

281
    SpentTime.Start();
1,800✔
282
    TSerialClientTaskHandler handler;
1,800✔
283
    TimeBalancer.AccumulateNext(SpentTime.GetStartTime(), handler, TItemSelectionPolicy::All);
1,800✔
284
    if (handler.NotReady) {
1,800✔
285
        return nullptr;
12✔
286
    }
287

288
    if (handler.TaskType == TClientTaskType::EVENTS) {
1,788✔
289
        if (EventsReader && EventsReader->HasDevicesWithEnabledEvents()) {
168✔
290
            lastAccessedDevice.PrepareToAccess(port, nullptr);
162✔
291
            EventsReader->ReadEvents(port, MAX_POLL_TIME, regCallback, NowFn);
162✔
292
            TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::High);
162✔
293
            TimeBalancer.AddEntry(TClientTaskType::EVENTS,
162✔
294
                                  SpentTime.GetStartTime() + ReadEventsPeriod,
324✔
295
                                  TPriority::High);
296
        }
297
        SpentTime.Start();
168✔
298

299
        // TODO: Need to notify port open/close logic about errors
300
        return nullptr;
168✔
301
    }
302

303
    // Some registers can have theoretical read time more than poll limit.
304
    // Define special cases when reading can exceed poll limit to read the registers:
305
    // 1. TimeBalancer can force reading of such registers.
306
    // 2. If there are not devices with enabled events, the only limiting timeout is MAX_POLL_TIME.
307
    //    We can miss it and read at least one register.
308
    const bool readAtLeastOneRegister =
309
        (handler.Policy == TItemAccumulationPolicy::Force) || !EventsReader->HasDevicesWithEnabledEvents();
1,620✔
310

311
    auto res = RegisterPoller.OpenPortCycle(port,
312
                                            SpentTime,
1,620✔
313
                                            std::min(handler.PollLimit, MAX_POLL_TIME),
3,240✔
314
                                            readAtLeastOneRegister,
315
                                            lastAccessedDevice,
316
                                            regCallback);
3,240✔
317

318
    TimeBalancer.AddEntry(TClientTaskType::POLLING, res.Deadline, TPriority::Low);
1,620✔
319
    if (res.NotEnoughTime) {
1,620✔
320
        LastCycleWasTooSmallToPoll = true;
74✔
321
    } else {
322
        LastCycleWasTooSmallToPoll = false;
1,546✔
323
        TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::Low);
1,546✔
324
    }
325

326
    if (EventsReader->HasDevicesWithEnabledEvents() && !TimeBalancer.Contains(TClientTaskType::EVENTS)) {
1,620✔
327
        TimeBalancer.AddEntry(TClientTaskType::EVENTS, SpentTime.GetStartTime() + ReadEventsPeriod, TPriority::High);
24✔
328
    }
329

330
    SpentTime.Start();
1,620✔
331
    return res.Device;
1,620✔
332
}
333

334
std::chrono::steady_clock::time_point TSerialClientRegisterAndEventsReader::GetDeadline(
1,800✔
335
    std::chrono::steady_clock::time_point currentTime) const
336
{
337
    return TimeBalancer.GetDeadline();
1,800✔
338
}
339

340
PSerialClientEventsReader TSerialClientRegisterAndEventsReader::GetEventsReader() const
192✔
341
{
342
    return EventsReader;
192✔
343
}
344

345
void TSerialClientRegisterAndEventsReader::SuspendPoll(PSerialDevice device,
6✔
346
                                                       std::chrono::steady_clock::time_point currentTime)
347
{
348
    RegisterPoller.SuspendPoll(device, currentTime);
6✔
349
}
6✔
350

351
void TSerialClientRegisterAndEventsReader::ResumePoll(PSerialDevice device)
4✔
352
{
353
    RegisterPoller.ResumePoll(device);
4✔
354
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc