• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

wirenboard / wb-mqtt-serial / 6

13 Jan 2026 06:31AM UTC coverage: 76.817% (+4.0%) from 72.836%
6

Pull #1049

github

8b2ffc
Ilia1S
Remove fw from groups
Pull Request #1049: WB-MR templates: Add delays

6873 of 9161 branches covered (75.02%)

12966 of 16879 relevant lines covered (76.82%)

830.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/src/serial_client_register_poller.cpp
1
#include "serial_client_register_poller.h"
2
#include <chrono>
3
#include <iostream>
4
#include <unistd.h>
5

6
#include "log.h"
7
#include "serial_device.h"
8

9
using namespace std::chrono_literals;
10
using namespace std::chrono;
11

12
#define LOG(logger) logger.Log() << "[serial client] "
13

14
namespace
15
{
16
    const auto MAX_LOW_PRIORITY_LAG = 1s;
17
    const auto SUSPEND_POLL_TIMEOUT = 10min;
18

19
    const auto DISCONNECTED_POLL_DELAY_STEP = 500ms;
20
    const auto DISCONNECTED_POLL_DELAY_LIMIT = 10s;
21

22
    class TDeviceReader
23
    {
24
        PRegisterRange RegisterRange;
25
        milliseconds MaxPollTime;
26
        PPollableDevice Device;
27
        bool ReadAtLeastOneRegister;
28
        const util::TSpentTimeMeter& SessionTime;
29
        TSerialClientDeviceAccessHandler& LastAccessedDevice;
30
        TFeaturePort& Port;
31

32
    public:
33
        TDeviceReader(TFeaturePort& port,
810✔
34
                      const util::TSpentTimeMeter& sessionTime,
35
                      milliseconds maxPollTime,
36
                      bool readAtLeastOneRegister,
37
                      TSerialClientDeviceAccessHandler& lastAccessedDevice)
38
            : MaxPollTime(maxPollTime),
810✔
39
              ReadAtLeastOneRegister(readAtLeastOneRegister),
40
              SessionTime(sessionTime),
41
              LastAccessedDevice(lastAccessedDevice),
42
              Port(port)
810✔
43
        {}
810✔
44

45
        bool operator()(const PPollableDevice& device, TItemAccumulationPolicy policy, milliseconds pollLimit)
833✔
46
        {
47
            if (Device) {
833✔
48
                return false;
38✔
49
            }
50

51
            pollLimit = std::min(MaxPollTime, pollLimit);
795✔
52
            if (policy != TItemAccumulationPolicy::Force) {
795✔
53
                ReadAtLeastOneRegister = false;
39✔
54
            }
55

56
            RegisterRange =
57
                device->ReadRegisterRange(Port, pollLimit, ReadAtLeastOneRegister, SessionTime, LastAccessedDevice);
795✔
58
            Device = device;
795✔
59
            return !RegisterRange->RegisterList().empty();
795✔
60
        }
61

62
        PRegisterRange GetRegisterRange() const
810✔
63
        {
64
            return RegisterRange;
810✔
65
        }
66

67
        PPollableDevice GetDevice() const
3,498✔
68
        {
69
            return Device;
3,498✔
70
        }
71
    };
72

73
    class TClosedPortDeviceReader
74
    {
75
        std::list<PRegister> Regs;
76
        steady_clock::time_point CurrentTime;
77
        PPollableDevice Device;
78

79
    public:
80
        TClosedPortDeviceReader(steady_clock::time_point currentTime): CurrentTime(currentTime)
4✔
81
        {}
4✔
82

83
        bool operator()(const PPollableDevice& device, TItemAccumulationPolicy policy, milliseconds pollLimit)
4✔
84
        {
85
            if (Device) {
4✔
86
                return false;
×
87
            }
88
            Regs = device->MarkWaitingRegistersAsReadErrorAndReschedule(CurrentTime);
4✔
89
            Device = device;
4✔
90
            return true;
4✔
91
        }
92

93
        std::list<PRegister>& GetRegisters()
16✔
94
        {
95
            return Regs;
16✔
96
        }
97

98
        PPollableDevice GetDevice() const
20✔
99
        {
100
            return Device;
20✔
101
        }
102

103
        void ClearRegisters()
8✔
104
        {
105
            Regs.clear();
8✔
106
            Device.reset();
8✔
107
        }
8✔
108
    };
109
};
110

111
TSerialClientRegisterPoller::TSerialClientRegisterPoller(size_t lowPriorityRateLimit)
96✔
112
    : Scheduler(MAX_LOW_PRIORITY_LAG),
113
      ThrottlingStateLogger(),
114
      LowPriorityRateLimiter(lowPriorityRateLimit)
96✔
115
{}
96✔
116

117
void TSerialClientRegisterPoller::SetDevices(const std::list<PSerialDevice>& devices,
96✔
118
                                             steady_clock::time_point currentTime)
119
{
120
    std::unique_lock lock(Mutex);
192✔
121

122
    for (const auto& dev: devices) {
197✔
123
        auto pollableDevice = std::make_shared<TPollableDevice>(dev, currentTime, TPriority::High);
101✔
124
        if (pollableDevice->HasRegisters()) {
101✔
125
            Scheduler.AddEntry(pollableDevice, currentTime, TPriority::High);
6✔
126
            Devices.insert({dev, pollableDevice});
6✔
127
        }
128
        pollableDevice = std::make_shared<TPollableDevice>(dev, currentTime, TPriority::Low);
101✔
129
        if (pollableDevice->HasRegisters()) {
101✔
130
            Scheduler.AddEntry(pollableDevice, currentTime, TPriority::Low);
99✔
131
            Devices.insert({dev, pollableDevice});
99✔
132
        }
133
        dev->AddOnConnectionStateChangedCallback(
202✔
134
            [this](PSerialDevice device) { OnDeviceConnectionStateChanged(device); });
335✔
135
    }
136
}
96✔
137

138
void TSerialClientRegisterPoller::ScheduleNextPoll(PPollableDevice device, steady_clock::time_point currentTime)
762✔
139
{
140
    if (device->HasRegisters()) {
762✔
141
        auto delay = milliseconds(0);
762✔
142
        auto deadline = device->GetDeadline();
762✔
143
        if (device->GetDevice()->GetConnectionState() == TDeviceConnectionState::DISCONNECTED) {
762✔
144
            delay = device->GetDisconnectedPollDelay();
53✔
145
            if (delay.count()) {
53✔
146
                deadline = currentTime + delay;
36✔
147
                LOG(Debug) << "Device " << device->GetDevice()->ToString() << " poll delayed for " << delay.count()
72✔
148
                           << " ms";
36✔
149
            }
150
            if (delay < DISCONNECTED_POLL_DELAY_LIMIT) {
53✔
151
                delay += DISCONNECTED_POLL_DELAY_STEP;
48✔
152
            }
153
        }
154
        device->SetDisconnectedPollDelay(delay);
762✔
155
        Scheduler.AddEntry(device, deadline, device->GetPriority());
762✔
156
    }
157
}
762✔
158

159
void TSerialClientRegisterPoller::ClosedPortCycle(steady_clock::time_point currentTime, TRegisterCallback callback)
4✔
160
{
161
    Scheduler.ResetLoadBalancing();
4✔
162

163
    RescheduleDisconnectedDevices();
4✔
164
    RescheduleDevicesWithSpendedPoll(currentTime);
4✔
165

166
    std::unique_lock lock(Mutex);
8✔
167

168
    TClosedPortDeviceReader reader(currentTime);
8✔
169
    do {
4✔
170
        reader.ClearRegisters();
8✔
171
        Scheduler.AccumulateNext(currentTime, reader, TItemSelectionPolicy::All);
8✔
172
        for (auto& reg: reader.GetRegisters()) {
16✔
173
            reg->SetError(TRegister::TError::ReadError);
8✔
174
            if (callback) {
8✔
175
                callback(reg);
8✔
176
            }
177
            auto device = reader.GetDevice()->GetDevice();
16✔
178
            device->SetTransferResult(false);
8✔
179
        }
180
        if (reader.GetDevice()) {
8✔
181
            ScheduleNextPoll(reader.GetDevice(), currentTime);
4✔
182
        }
183
    } while (!reader.GetRegisters().empty());
8✔
184
}
4✔
185

186
std::chrono::steady_clock::time_point TSerialClientRegisterPoller::GetDeadline(
773✔
187
    bool lowPriorityRateLimitIsExceeded,
188
    const util::TSpentTimeMeter& spentTime) const
189
{
190
    if (Scheduler.IsEmpty()) {
773✔
191
        return spentTime.GetStartTime() + 1s;
15✔
192
    }
193
    if (lowPriorityRateLimitIsExceeded) {
758✔
194
        auto lowPriorityDeadline = Scheduler.GetLowPriorityDeadline();
×
195
        // There are some low priority items
196
        if (lowPriorityDeadline != std::chrono::steady_clock::time_point::max()) {
×
197
            lowPriorityDeadline = std::max(lowPriorityDeadline, LowPriorityRateLimiter.GetStartTime() + 1s);
×
198
        }
199
        return std::min(Scheduler.GetHighPriorityDeadline(), lowPriorityDeadline);
×
200
    }
201
    return Scheduler.GetDeadline();
758✔
202
}
203

204
TPollResult TSerialClientRegisterPoller::OpenPortCycle(TFeaturePort& port,
810✔
205
                                                       const util::TSpentTimeMeter& spentTime,
206
                                                       std::chrono::milliseconds maxPollingTime,
207
                                                       bool readAtLeastOneRegister,
208
                                                       TSerialClientDeviceAccessHandler& lastAccessedDevice,
209
                                                       TRegisterCallback callback)
210
{
211
    RescheduleDisconnectedDevices();
810✔
212
    RescheduleDevicesWithSpendedPoll(spentTime.GetStartTime());
810✔
213

214
    std::unique_lock lock(Mutex);
1,620✔
215

216
    TPollResult res;
810✔
217

218
    TDeviceReader reader(port, spentTime, maxPollingTime, readAtLeastOneRegister, lastAccessedDevice);
1,620✔
219

220
    bool lowPriorityRateLimitIsExceeded = LowPriorityRateLimiter.IsOverLimit(spentTime.GetStartTime());
810✔
221

222
    Scheduler.AccumulateNext(spentTime.GetStartTime(),
810✔
223
                             reader,
224
                             lowPriorityRateLimitIsExceeded ? TItemSelectionPolicy::OnlyHighPriority
225
                                                            : TItemSelectionPolicy::All);
226
    if (lowPriorityRateLimitIsExceeded) {
810✔
227
        auto throttlingMsg = ThrottlingStateLogger.GetMessage();
×
228
        if (!throttlingMsg.empty()) {
×
229
            LOG(Warn) << port.GetDescription() << " " << throttlingMsg;
×
230
        }
231
    }
232
    auto range = reader.GetRegisterRange();
1,620✔
233

234
    if (!range) {
810✔
235
        // Nothing to read
236
        res.Deadline = GetDeadline(lowPriorityRateLimitIsExceeded, spentTime);
15✔
237
        return res;
15✔
238
    }
239

240
    // There are registers waiting read, but they don't fit in allowed poll limit
241
    if (range->RegisterList().empty()) {
795✔
242
        res.NotEnoughTime = true;
37✔
243
        if (reader.GetDevice()->GetPriority() == TPriority::High) {
37✔
244
            // High priority registers are limited by maxPollingTime
245
            res.Deadline = spentTime.GetStartTime() + maxPollingTime;
1✔
246
        } else {
247
            // Low priority registers are limited by high priority and maxPollingTime
248
            res.Deadline = std::min(Scheduler.GetHighPriorityDeadline(), spentTime.GetStartTime() + maxPollingTime);
36✔
249
        }
250
        return res;
37✔
251
    }
252

253
    res.Device = reader.GetDevice()->GetDevice();
758✔
254

255
    for (auto& reg: range->RegisterList()) {
1,945✔
256
        if (callback) {
1,187✔
257
            callback(reg);
1,187✔
258
        }
259
        if (reader.GetDevice()->GetPriority() == TPriority::Low) {
1,187✔
260
            LowPriorityRateLimiter.NewItem(spentTime.GetStartTime());
1,145✔
261
        }
262
    }
263
    ScheduleNextPoll(reader.GetDevice(), spentTime.GetStartTime());
758✔
264

265
    Scheduler.UpdateSelectionTime(ceil<milliseconds>(spentTime.GetSpentTime()), reader.GetDevice()->GetPriority());
758✔
266
    res.Deadline = GetDeadline(lowPriorityRateLimitIsExceeded, spentTime);
758✔
267
    return res;
758✔
268
}
269

270
void TSerialClientRegisterPoller::SuspendPoll(PSerialDevice device, std::chrono::steady_clock::time_point currentTime)
3✔
271
{
272
    std::unique_lock lock(Mutex);
6✔
273

274
    if (DevicesWithSpendedPoll.find(device) == DevicesWithSpendedPoll.end()) {
3✔
275
        auto range = Devices.equal_range(device);
3✔
276
        if (range.first == range.second) {
3✔
277
            throw std::runtime_error("Device " + device->ToString() + " is not included to poll");
×
278
        }
279
        for (auto it = range.first; it != range.second; ++it) {
6✔
280
            Scheduler.Remove(it->second);
3✔
281
        }
282
    }
283

284
    DevicesWithSpendedPoll[device] = currentTime + SUSPEND_POLL_TIMEOUT;
3✔
285
    LOG(Info) << "Device " << device->ToString() << " poll suspended";
3✔
286

287
    if (device->GetConnectionState() != TDeviceConnectionState::DISCONNECTED) {
3✔
288
        device->SetDisconnected();
3✔
289
    }
290
}
3✔
291

292
void TSerialClientRegisterPoller::ResumePoll(PSerialDevice device)
3✔
293
{
294
    std::unique_lock lock(Mutex);
3✔
295

296
    if (DevicesWithSpendedPoll.find(device) == DevicesWithSpendedPoll.end()) {
3✔
297
        throw std::runtime_error("Device " + device->ToString() + " poll is not suspended");
×
298
    }
299

300
    auto range = Devices.equal_range(device);
3✔
301
    for (auto it = range.first; it != range.second; ++it) {
6✔
302
        it->second->RescheduleAllRegisters();
3✔
303
        Scheduler.AddEntry(it->second, it->second->GetDeadline(), it->second->GetPriority());
3✔
304
    }
305

306
    DevicesWithSpendedPoll.erase(device);
3✔
307
    LOG(Info) << "Device " << device->ToString() << " poll resumed";
3✔
308
}
3✔
309

310
void TSerialClientRegisterPoller::OnDeviceConnectionStateChanged(PSerialDevice device)
133✔
311
{
312
    if (device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED &&
154✔
313
        DevicesWithSpendedPoll.find(device) == DevicesWithSpendedPoll.end())
154✔
314
    {
315
        DisconnectedDevicesWaitingForReschedule.push_back(device);
18✔
316
    }
317
}
133✔
318

319
bool TPollableDeviceComparePredicate::operator()(const PPollableDevice& d1, const PPollableDevice& d2) const
14✔
320
{
321
    if (d1->GetDevice() != d2->GetDevice()) {
14✔
322
        return d1->GetDevice()->DeviceConfig()->SlaveId > d2->GetDevice()->DeviceConfig()->SlaveId;
14✔
323
    }
324
    return false;
×
325
}
326

327
TThrottlingStateLogger::TThrottlingStateLogger(): FirstTime(true)
96✔
328
{}
96✔
329

330
std::string TThrottlingStateLogger::GetMessage()
×
331
{
332
    if (FirstTime) {
×
333
        FirstTime = false;
×
334
        return "Register read rate limit is exceeded";
×
335
    }
336
    return std::string();
×
337
}
338

339
void TSerialClientRegisterPoller::RescheduleDisconnectedDevices()
814✔
340
{
341
    std::unique_lock lock(Mutex);
1,628✔
342
    for (auto& device: DisconnectedDevicesWaitingForReschedule) {
832✔
343
        auto range = Devices.equal_range(device);
18✔
344
        for (auto it = range.first; it != range.second; ++it) {
37✔
345
            Scheduler.Remove(it->second);
19✔
346
            it->second->RescheduleAllRegisters();
19✔
347
            Scheduler.AddEntry(it->second, it->second->GetDeadline(), it->second->GetPriority());
19✔
348
        }
349
    }
350
    DisconnectedDevicesWaitingForReschedule.clear();
814✔
351
}
814✔
352

353
void TSerialClientRegisterPoller::RescheduleDevicesWithSpendedPoll(std::chrono::steady_clock::time_point currentTime)
814✔
354
{
355
    std::list<PSerialDevice> list;
1,628✔
356
    {
357
        std::unique_lock lock(Mutex);
1,628✔
358
        for (auto it = DevicesWithSpendedPoll.begin(); it != DevicesWithSpendedPoll.end(); ++it) {
830✔
359
            if (it->second <= currentTime) {
16✔
360
                list.push_back(it->first);
1✔
361
            }
362
        }
363
    }
364
    for (auto device: list) {
815✔
365
        ResumePoll(device);
1✔
366
    }
367
}
814✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc