• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 2

29 Dec 2025 12:28PM UTC coverage: 76.817% (+4.0%) from 72.836%
2

Pull #1045

github

54aa0c
pgasheev
up changelog
Pull Request #1045: Fix firmware version in WB-M1W2 template

6873 of 9161 branches covered (75.02%)

12966 of 16879 relevant lines covered (76.82%)

1651.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/src/serial_client_register_poller.cpp
1
#include "serial_client_register_poller.h"
2
#include <chrono>
3
#include <iostream>
4
#include <unistd.h>
5

6
#include "log.h"
7
#include "serial_device.h"
8

9
using namespace std::chrono_literals;
10
using namespace std::chrono;
11

12
#define LOG(logger) logger.Log() << "[serial client] "
13

14
namespace
15
{
16
    const auto MAX_LOW_PRIORITY_LAG = 1s;
17
    const auto SUSPEND_POLL_TIMEOUT = 10min;
18

19
    const auto DISCONNECTED_POLL_DELAY_STEP = 500ms;
20
    const auto DISCONNECTED_POLL_DELAY_LIMIT = 10s;
21

22
    class TDeviceReader
23
    {
24
        PRegisterRange RegisterRange;
25
        milliseconds MaxPollTime;
26
        PPollableDevice Device;
27
        bool ReadAtLeastOneRegister;
28
        const util::TSpentTimeMeter& SessionTime;
29
        TSerialClientDeviceAccessHandler& LastAccessedDevice;
30
        TFeaturePort& Port;
31

32
    public:
33
        TDeviceReader(TFeaturePort& port,
1,620✔
34
                      const util::TSpentTimeMeter& sessionTime,
35
                      milliseconds maxPollTime,
36
                      bool readAtLeastOneRegister,
37
                      TSerialClientDeviceAccessHandler& lastAccessedDevice)
38
            : MaxPollTime(maxPollTime),
1,620✔
39
              ReadAtLeastOneRegister(readAtLeastOneRegister),
40
              SessionTime(sessionTime),
41
              LastAccessedDevice(lastAccessedDevice),
42
              Port(port)
1,620✔
43
        {}
1,620✔
44

45
        bool operator()(const PPollableDevice& device, TItemAccumulationPolicy policy, milliseconds pollLimit)
1,666✔
46
        {
47
            if (Device) {
1,666✔
48
                return false;
76✔
49
            }
50

51
            pollLimit = std::min(MaxPollTime, pollLimit);
1,590✔
52
            if (policy != TItemAccumulationPolicy::Force) {
1,590✔
53
                ReadAtLeastOneRegister = false;
78✔
54
            }
55

56
            RegisterRange =
57
                device->ReadRegisterRange(Port, pollLimit, ReadAtLeastOneRegister, SessionTime, LastAccessedDevice);
1,590✔
58
            Device = device;
1,590✔
59
            return !RegisterRange->RegisterList().empty();
1,590✔
60
        }
61

62
        PRegisterRange GetRegisterRange() const
1,620✔
63
        {
64
            return RegisterRange;
1,620✔
65
        }
66

67
        PPollableDevice GetDevice() const
6,996✔
68
        {
69
            return Device;
6,996✔
70
        }
71
    };
72

73
    class TClosedPortDeviceReader
74
    {
75
        std::list<PRegister> Regs;
76
        steady_clock::time_point CurrentTime;
77
        PPollableDevice Device;
78

79
    public:
80
        TClosedPortDeviceReader(steady_clock::time_point currentTime): CurrentTime(currentTime)
8✔
81
        {}
8✔
82

83
        bool operator()(const PPollableDevice& device, TItemAccumulationPolicy policy, milliseconds pollLimit)
8✔
84
        {
85
            if (Device) {
8✔
86
                return false;
×
87
            }
88
            Regs = device->MarkWaitingRegistersAsReadErrorAndReschedule(CurrentTime);
8✔
89
            Device = device;
8✔
90
            return true;
8✔
91
        }
92

93
        std::list<PRegister>& GetRegisters()
32✔
94
        {
95
            return Regs;
32✔
96
        }
97

98
        PPollableDevice GetDevice() const
40✔
99
        {
100
            return Device;
40✔
101
        }
102

103
        void ClearRegisters()
16✔
104
        {
105
            Regs.clear();
16✔
106
            Device.reset();
16✔
107
        }
16✔
108
    };
109
};
110

111
TSerialClientRegisterPoller::TSerialClientRegisterPoller(size_t lowPriorityRateLimit)
192✔
112
    : Scheduler(MAX_LOW_PRIORITY_LAG),
113
      ThrottlingStateLogger(),
114
      LowPriorityRateLimiter(lowPriorityRateLimit)
192✔
115
{}
192✔
116

117
void TSerialClientRegisterPoller::SetDevices(const std::list<PSerialDevice>& devices,
192✔
118
                                             steady_clock::time_point currentTime)
119
{
120
    std::unique_lock lock(Mutex);
384✔
121

122
    for (const auto& dev: devices) {
394✔
123
        auto pollableDevice = std::make_shared<TPollableDevice>(dev, currentTime, TPriority::High);
202✔
124
        if (pollableDevice->HasRegisters()) {
202✔
125
            Scheduler.AddEntry(pollableDevice, currentTime, TPriority::High);
12✔
126
            Devices.insert({dev, pollableDevice});
12✔
127
        }
128
        pollableDevice = std::make_shared<TPollableDevice>(dev, currentTime, TPriority::Low);
202✔
129
        if (pollableDevice->HasRegisters()) {
202✔
130
            Scheduler.AddEntry(pollableDevice, currentTime, TPriority::Low);
198✔
131
            Devices.insert({dev, pollableDevice});
198✔
132
        }
133
        dev->AddOnConnectionStateChangedCallback(
404✔
134
            [this](PSerialDevice device) { OnDeviceConnectionStateChanged(device); });
670✔
135
    }
136
}
192✔
137

138
void TSerialClientRegisterPoller::ScheduleNextPoll(PPollableDevice device, steady_clock::time_point currentTime)
1,524✔
139
{
140
    if (device->HasRegisters()) {
1,524✔
141
        auto delay = milliseconds(0);
1,524✔
142
        auto deadline = device->GetDeadline();
1,524✔
143
        if (device->GetDevice()->GetConnectionState() == TDeviceConnectionState::DISCONNECTED) {
1,524✔
144
            delay = device->GetDisconnectedPollDelay();
106✔
145
            if (delay.count()) {
106✔
146
                deadline = currentTime + delay;
72✔
147
                LOG(Debug) << "Device " << device->GetDevice()->ToString() << " poll delayed for " << delay.count()
144✔
148
                           << " ms";
72✔
149
            }
150
            if (delay < DISCONNECTED_POLL_DELAY_LIMIT) {
106✔
151
                delay += DISCONNECTED_POLL_DELAY_STEP;
96✔
152
            }
153
        }
154
        device->SetDisconnectedPollDelay(delay);
1,524✔
155
        Scheduler.AddEntry(device, deadline, device->GetPriority());
1,524✔
156
    }
157
}
1,524✔
158

159
void TSerialClientRegisterPoller::ClosedPortCycle(steady_clock::time_point currentTime, TRegisterCallback callback)
8✔
160
{
161
    Scheduler.ResetLoadBalancing();
8✔
162

163
    RescheduleDisconnectedDevices();
8✔
164
    RescheduleDevicesWithSpendedPoll(currentTime);
8✔
165

166
    std::unique_lock lock(Mutex);
16✔
167

168
    TClosedPortDeviceReader reader(currentTime);
16✔
169
    do {
8✔
170
        reader.ClearRegisters();
16✔
171
        Scheduler.AccumulateNext(currentTime, reader, TItemSelectionPolicy::All);
16✔
172
        for (auto& reg: reader.GetRegisters()) {
32✔
173
            reg->SetError(TRegister::TError::ReadError);
16✔
174
            if (callback) {
16✔
175
                callback(reg);
16✔
176
            }
177
            auto device = reader.GetDevice()->GetDevice();
32✔
178
            device->SetTransferResult(false);
16✔
179
        }
180
        if (reader.GetDevice()) {
16✔
181
            ScheduleNextPoll(reader.GetDevice(), currentTime);
8✔
182
        }
183
    } while (!reader.GetRegisters().empty());
16✔
184
}
8✔
185

186
std::chrono::steady_clock::time_point TSerialClientRegisterPoller::GetDeadline(
1,546✔
187
    bool lowPriorityRateLimitIsExceeded,
188
    const util::TSpentTimeMeter& spentTime) const
189
{
190
    if (Scheduler.IsEmpty()) {
1,546✔
191
        return spentTime.GetStartTime() + 1s;
30✔
192
    }
193
    if (lowPriorityRateLimitIsExceeded) {
1,516✔
194
        auto lowPriorityDeadline = Scheduler.GetLowPriorityDeadline();
×
195
        // There are some low priority items
196
        if (lowPriorityDeadline != std::chrono::steady_clock::time_point::max()) {
×
197
            lowPriorityDeadline = std::max(lowPriorityDeadline, LowPriorityRateLimiter.GetStartTime() + 1s);
×
198
        }
199
        return std::min(Scheduler.GetHighPriorityDeadline(), lowPriorityDeadline);
×
200
    }
201
    return Scheduler.GetDeadline();
1,516✔
202
}
203

204
TPollResult TSerialClientRegisterPoller::OpenPortCycle(TFeaturePort& port,
1,620✔
205
                                                       const util::TSpentTimeMeter& spentTime,
206
                                                       std::chrono::milliseconds maxPollingTime,
207
                                                       bool readAtLeastOneRegister,
208
                                                       TSerialClientDeviceAccessHandler& lastAccessedDevice,
209
                                                       TRegisterCallback callback)
210
{
211
    RescheduleDisconnectedDevices();
1,620✔
212
    RescheduleDevicesWithSpendedPoll(spentTime.GetStartTime());
1,620✔
213

214
    std::unique_lock lock(Mutex);
3,240✔
215

216
    TPollResult res;
1,620✔
217

218
    TDeviceReader reader(port, spentTime, maxPollingTime, readAtLeastOneRegister, lastAccessedDevice);
3,240✔
219

220
    bool lowPriorityRateLimitIsExceeded = LowPriorityRateLimiter.IsOverLimit(spentTime.GetStartTime());
1,620✔
221

222
    Scheduler.AccumulateNext(spentTime.GetStartTime(),
1,620✔
223
                             reader,
224
                             lowPriorityRateLimitIsExceeded ? TItemSelectionPolicy::OnlyHighPriority
225
                                                            : TItemSelectionPolicy::All);
226
    if (lowPriorityRateLimitIsExceeded) {
1,620✔
227
        auto throttlingMsg = ThrottlingStateLogger.GetMessage();
×
228
        if (!throttlingMsg.empty()) {
×
229
            LOG(Warn) << port.GetDescription() << " " << throttlingMsg;
×
230
        }
231
    }
232
    auto range = reader.GetRegisterRange();
3,240✔
233

234
    if (!range) {
1,620✔
235
        // Nothing to read
236
        res.Deadline = GetDeadline(lowPriorityRateLimitIsExceeded, spentTime);
30✔
237
        return res;
30✔
238
    }
239

240
    // There are registers waiting read, but they don't fit in allowed poll limit
241
    if (range->RegisterList().empty()) {
1,590✔
242
        res.NotEnoughTime = true;
74✔
243
        if (reader.GetDevice()->GetPriority() == TPriority::High) {
74✔
244
            // High priority registers are limited by maxPollingTime
245
            res.Deadline = spentTime.GetStartTime() + maxPollingTime;
2✔
246
        } else {
247
            // Low priority registers are limited by high priority and maxPollingTime
248
            res.Deadline = std::min(Scheduler.GetHighPriorityDeadline(), spentTime.GetStartTime() + maxPollingTime);
72✔
249
        }
250
        return res;
74✔
251
    }
252

253
    res.Device = reader.GetDevice()->GetDevice();
1,516✔
254

255
    for (auto& reg: range->RegisterList()) {
3,890✔
256
        if (callback) {
2,374✔
257
            callback(reg);
2,374✔
258
        }
259
        if (reader.GetDevice()->GetPriority() == TPriority::Low) {
2,374✔
260
            LowPriorityRateLimiter.NewItem(spentTime.GetStartTime());
2,290✔
261
        }
262
    }
263
    ScheduleNextPoll(reader.GetDevice(), spentTime.GetStartTime());
1,516✔
264

265
    Scheduler.UpdateSelectionTime(ceil<milliseconds>(spentTime.GetSpentTime()), reader.GetDevice()->GetPriority());
1,516✔
266
    res.Deadline = GetDeadline(lowPriorityRateLimitIsExceeded, spentTime);
1,516✔
267
    return res;
1,516✔
268
}
269

270
void TSerialClientRegisterPoller::SuspendPoll(PSerialDevice device, std::chrono::steady_clock::time_point currentTime)
6✔
271
{
272
    std::unique_lock lock(Mutex);
12✔
273

274
    if (DevicesWithSpendedPoll.find(device) == DevicesWithSpendedPoll.end()) {
6✔
275
        auto range = Devices.equal_range(device);
6✔
276
        if (range.first == range.second) {
6✔
277
            throw std::runtime_error("Device " + device->ToString() + " is not included to poll");
×
278
        }
279
        for (auto it = range.first; it != range.second; ++it) {
12✔
280
            Scheduler.Remove(it->second);
6✔
281
        }
282
    }
283

284
    DevicesWithSpendedPoll[device] = currentTime + SUSPEND_POLL_TIMEOUT;
6✔
285
    LOG(Info) << "Device " << device->ToString() << " poll suspended";
6✔
286

287
    if (device->GetConnectionState() != TDeviceConnectionState::DISCONNECTED) {
6✔
288
        device->SetDisconnected();
6✔
289
    }
290
}
6✔
291

292
void TSerialClientRegisterPoller::ResumePoll(PSerialDevice device)
6✔
293
{
294
    std::unique_lock lock(Mutex);
6✔
295

296
    if (DevicesWithSpendedPoll.find(device) == DevicesWithSpendedPoll.end()) {
6✔
297
        throw std::runtime_error("Device " + device->ToString() + " poll is not suspended");
×
298
    }
299

300
    auto range = Devices.equal_range(device);
6✔
301
    for (auto it = range.first; it != range.second; ++it) {
12✔
302
        it->second->RescheduleAllRegisters();
6✔
303
        Scheduler.AddEntry(it->second, it->second->GetDeadline(), it->second->GetPriority());
6✔
304
    }
305

306
    DevicesWithSpendedPoll.erase(device);
6✔
307
    LOG(Info) << "Device " << device->ToString() << " poll resumed";
6✔
308
}
6✔
309

310
void TSerialClientRegisterPoller::OnDeviceConnectionStateChanged(PSerialDevice device)
266✔
311
{
312
    if (device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED &&
308✔
313
        DevicesWithSpendedPoll.find(device) == DevicesWithSpendedPoll.end())
308✔
314
    {
315
        DisconnectedDevicesWaitingForReschedule.push_back(device);
36✔
316
    }
317
}
266✔
318

319
bool TPollableDeviceComparePredicate::operator()(const PPollableDevice& d1, const PPollableDevice& d2) const
28✔
320
{
321
    if (d1->GetDevice() != d2->GetDevice()) {
28✔
322
        return d1->GetDevice()->DeviceConfig()->SlaveId > d2->GetDevice()->DeviceConfig()->SlaveId;
28✔
323
    }
324
    return false;
×
325
}
326

327
TThrottlingStateLogger::TThrottlingStateLogger(): FirstTime(true)
192✔
328
{}
192✔
329

330
std::string TThrottlingStateLogger::GetMessage()
×
331
{
332
    if (FirstTime) {
×
333
        FirstTime = false;
×
334
        return "Register read rate limit is exceeded";
×
335
    }
336
    return std::string();
×
337
}
338

339
void TSerialClientRegisterPoller::RescheduleDisconnectedDevices()
1,628✔
340
{
341
    std::unique_lock lock(Mutex);
3,256✔
342
    for (auto& device: DisconnectedDevicesWaitingForReschedule) {
1,664✔
343
        auto range = Devices.equal_range(device);
36✔
344
        for (auto it = range.first; it != range.second; ++it) {
74✔
345
            Scheduler.Remove(it->second);
38✔
346
            it->second->RescheduleAllRegisters();
38✔
347
            Scheduler.AddEntry(it->second, it->second->GetDeadline(), it->second->GetPriority());
38✔
348
        }
349
    }
350
    DisconnectedDevicesWaitingForReschedule.clear();
1,628✔
351
}
1,628✔
352

353
void TSerialClientRegisterPoller::RescheduleDevicesWithSpendedPoll(std::chrono::steady_clock::time_point currentTime)
1,628✔
354
{
355
    std::list<PSerialDevice> list;
3,256✔
356
    {
357
        std::unique_lock lock(Mutex);
3,256✔
358
        for (auto it = DevicesWithSpendedPoll.begin(); it != DevicesWithSpendedPoll.end(); ++it) {
1,660✔
359
            if (it->second <= currentTime) {
32✔
360
                list.push_back(it->first);
2✔
361
            }
362
        }
363
    }
364
    for (auto device: list) {
1,630✔
365
        ResumePoll(device);
2✔
366
    }
367
}
1,628✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc