• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 1

08 Jul 2025 01:20PM UTC coverage: 73.854% (+1.0%) from 72.836%
1

Pull #963

github

39d9bc
KraPete
Bump version
Pull Request #963: Bump version

6444 of 9057 branches covered (71.15%)

12341 of 16710 relevant lines covered (73.85%)

305.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.22
/src/serial_client_register_poller.cpp
1
#include "serial_client_register_poller.h"
2
#include <chrono>
3
#include <iostream>
4
#include <unistd.h>
5

6
#include "log.h"
7
#include "serial_device.h"
8

9
using namespace std::chrono_literals;
10
using namespace std::chrono;
11

12
#define LOG(logger) logger.Log() << "[serial client] "
13

14
namespace
15
{
16
    const auto MAX_LOW_PRIORITY_LAG = 1s;
17

18
    class TDeviceReader
19
    {
20
        PRegisterRange RegisterRange;
21
        milliseconds MaxPollTime;
22
        PPollableDevice Device;
23
        bool ReadAtLeastOneRegister;
24
        const util::TSpentTimeMeter& SessionTime;
25
        TSerialClientDeviceAccessHandler& LastAccessedDevice;
26
        TPort& Port;
27

28
    public:
29
        TDeviceReader(TPort& port,
753✔
30
                      const util::TSpentTimeMeter& sessionTime,
31
                      milliseconds maxPollTime,
32
                      bool readAtLeastOneRegister,
33
                      TSerialClientDeviceAccessHandler& lastAccessedDevice)
34
            : MaxPollTime(maxPollTime),
753✔
35
              ReadAtLeastOneRegister(readAtLeastOneRegister),
36
              SessionTime(sessionTime),
37
              LastAccessedDevice(lastAccessedDevice),
38
              Port(port)
753✔
39
        {}
753✔
40

41
        bool operator()(const PPollableDevice& device, TItemAccumulationPolicy policy, milliseconds pollLimit)
792✔
42
        {
43
            if (Device) {
792✔
44
                return false;
39✔
45
            }
46

47
            pollLimit = std::min(MaxPollTime, pollLimit);
753✔
48
            if (policy != TItemAccumulationPolicy::Force) {
753✔
49
                ReadAtLeastOneRegister = false;
39✔
50
            }
51

52
            RegisterRange =
53
                device->ReadRegisterRange(Port, pollLimit, ReadAtLeastOneRegister, SessionTime, LastAccessedDevice);
753✔
54
            Device = device;
753✔
55
            return !RegisterRange->RegisterList().empty();
753✔
56
        }
57

58
        PRegisterRange GetRegisterRange() const
753✔
59
        {
60
            return RegisterRange;
753✔
61
        }
62

63
        PPollableDevice GetDevice() const
3,314✔
64
        {
65
            return Device;
3,314✔
66
        }
67
    };
68

69
    class TClosedPortDeviceReader
70
    {
71
        std::list<PRegister> Regs;
72
        steady_clock::time_point CurrentTime;
73
        PPollableDevice Device;
74

75
    public:
76
        TClosedPortDeviceReader(steady_clock::time_point currentTime): CurrentTime(currentTime)
4✔
77
        {}
4✔
78

79
        bool operator()(const PPollableDevice& device, TItemAccumulationPolicy policy, milliseconds pollLimit)
4✔
80
        {
81
            if (Device) {
4✔
82
                return false;
×
83
            }
84
            Regs = device->MarkWaitingRegistersAsReadErrorAndReschedule(CurrentTime);
4✔
85
            Device = device;
4✔
86
            return true;
4✔
87
        }
88

89
        std::list<PRegister>& GetRegisters()
16✔
90
        {
91
            return Regs;
16✔
92
        }
93

94
        PPollableDevice GetDevice() const
20✔
95
        {
96
            return Device;
20✔
97
        }
98

99
        void ClearRegisters()
8✔
100
        {
101
            Regs.clear();
8✔
102
            Device.reset();
8✔
103
        }
8✔
104
    };
105
};
106

107
TSerialClientRegisterPoller::TSerialClientRegisterPoller(size_t lowPriorityRateLimit)
92✔
108
    : Scheduler(MAX_LOW_PRIORITY_LAG),
109
      ThrottlingStateLogger(),
110
      LowPriorityRateLimiter(lowPriorityRateLimit)
92✔
111
{}
92✔
112

113
void TSerialClientRegisterPoller::SetDevices(const std::list<PSerialDevice>& devices,
92✔
114
                                             steady_clock::time_point currentTime)
115
{
116
    for (const auto& dev: devices) {
189✔
117
        auto pollableDevice = std::make_shared<TPollableDevice>(dev, currentTime, TPriority::High);
97✔
118
        if (pollableDevice->HasRegisters()) {
97✔
119
            Scheduler.AddEntry(pollableDevice, currentTime, TPriority::High);
6✔
120
            Devices.insert({dev, pollableDevice});
6✔
121
        }
122
        pollableDevice = std::make_shared<TPollableDevice>(dev, currentTime, TPriority::Low);
97✔
123
        if (pollableDevice->HasRegisters()) {
97✔
124
            Scheduler.AddEntry(pollableDevice, currentTime, TPriority::Low);
95✔
125
            Devices.insert({dev, pollableDevice});
95✔
126
        }
127
        dev->AddOnConnectionStateChangedCallback(
194✔
128
            [this](PSerialDevice device) { OnDeviceConnectionStateChanged(device); });
315✔
129
    }
130
}
92✔
131

132
void TSerialClientRegisterPoller::ScheduleNextPoll(PPollableDevice device)
720✔
133
{
134
    if (device->HasRegisters()) {
720✔
135
        Scheduler.AddEntry(device, device->GetDeadline(), device->GetPriority());
720✔
136
    }
137
}
720✔
138

139
void TSerialClientRegisterPoller::ClosedPortCycle(steady_clock::time_point currentTime, TRegisterCallback callback)
4✔
140
{
141
    Scheduler.ResetLoadBalancing();
4✔
142

143
    RescheduleDisconnectedDevices();
4✔
144

145
    TClosedPortDeviceReader reader(currentTime);
8✔
146
    do {
4✔
147
        reader.ClearRegisters();
8✔
148
        Scheduler.AccumulateNext(currentTime, reader, TItemSelectionPolicy::All);
8✔
149
        for (auto& reg: reader.GetRegisters()) {
16✔
150
            reg->SetError(TRegister::TError::ReadError);
8✔
151
            if (callback) {
8✔
152
                callback(reg);
8✔
153
            }
154
            auto device = reader.GetDevice()->GetDevice();
16✔
155
            device->SetTransferResult(false);
8✔
156
        }
157
        if (reader.GetDevice()) {
8✔
158
            ScheduleNextPoll(reader.GetDevice());
4✔
159
        }
160
    } while (!reader.GetRegisters().empty());
8✔
161
}
4✔
162

163
std::chrono::steady_clock::time_point TSerialClientRegisterPoller::GetDeadline(
716✔
164
    bool lowPriorityRateLimitIsExceeded,
165
    const util::TSpentTimeMeter& spentTime) const
166
{
167
    if (Scheduler.IsEmpty()) {
716✔
168
        return spentTime.GetStartTime() + 1s;
×
169
    }
170
    if (lowPriorityRateLimitIsExceeded) {
716✔
171
        auto lowPriorityDeadline = Scheduler.GetLowPriorityDeadline();
×
172
        // There are some low priority items
173
        if (lowPriorityDeadline != std::chrono::steady_clock::time_point::max()) {
×
174
            lowPriorityDeadline = std::max(lowPriorityDeadline, LowPriorityRateLimiter.GetStartTime() + 1s);
×
175
        }
176
        return std::min(Scheduler.GetHighPriorityDeadline(), lowPriorityDeadline);
×
177
    }
178
    return Scheduler.GetDeadline();
716✔
179
}
180

181
TPollResult TSerialClientRegisterPoller::OpenPortCycle(TPort& port,
753✔
182
                                                       const util::TSpentTimeMeter& spentTime,
183
                                                       std::chrono::milliseconds maxPollingTime,
184
                                                       bool readAtLeastOneRegister,
185
                                                       TSerialClientDeviceAccessHandler& lastAccessedDevice,
186
                                                       TRegisterCallback callback)
187
{
188
    RescheduleDisconnectedDevices();
753✔
189

190
    TPollResult res;
753✔
191

192
    TDeviceReader reader(port, spentTime, maxPollingTime, readAtLeastOneRegister, lastAccessedDevice);
1,506✔
193

194
    bool lowPriorityRateLimitIsExceeded = LowPriorityRateLimiter.IsOverLimit(spentTime.GetStartTime());
753✔
195

196
    Scheduler.AccumulateNext(spentTime.GetStartTime(),
753✔
197
                             reader,
198
                             lowPriorityRateLimitIsExceeded ? TItemSelectionPolicy::OnlyHighPriority
199
                                                            : TItemSelectionPolicy::All);
200
    if (lowPriorityRateLimitIsExceeded) {
753✔
201
        auto throttlingMsg = ThrottlingStateLogger.GetMessage();
×
202
        if (!throttlingMsg.empty()) {
×
203
            LOG(Warn) << port.GetDescription() << " " << throttlingMsg;
×
204
        }
205
    }
206
    auto range = reader.GetRegisterRange();
1,506✔
207

208
    if (!range) {
753✔
209
        // Nothing to read
210
        res.Deadline = GetDeadline(lowPriorityRateLimitIsExceeded, spentTime);
×
211
        return res;
×
212
    }
213

214
    // There are registers waiting read, but they don't fit in allowed poll limit
215
    if (range->RegisterList().empty()) {
753✔
216
        res.NotEnoughTime = true;
37✔
217
        if (reader.GetDevice()->GetPriority() == TPriority::High) {
37✔
218
            // High priority registers are limited by maxPollingTime
219
            res.Deadline = spentTime.GetStartTime() + maxPollingTime;
1✔
220
        } else {
221
            // Low priority registers are limited by high priority and maxPollingTime
222
            res.Deadline = std::min(Scheduler.GetHighPriorityDeadline(), spentTime.GetStartTime() + maxPollingTime);
36✔
223
        }
224
        return res;
37✔
225
    }
226

227
    res.Device = reader.GetDevice()->GetDevice();
716✔
228

229
    for (auto& reg: range->RegisterList()) {
1,845✔
230
        if (callback) {
1,129✔
231
            callback(reg);
1,129✔
232
        }
233
        if (reader.GetDevice()->GetPriority() == TPriority::Low) {
1,129✔
234
            LowPriorityRateLimiter.NewItem(spentTime.GetStartTime());
1,087✔
235
        }
236
    }
237
    ScheduleNextPoll(reader.GetDevice());
716✔
238

239
    Scheduler.UpdateSelectionTime(ceil<milliseconds>(spentTime.GetSpentTime()), reader.GetDevice()->GetPriority());
716✔
240
    res.Deadline = GetDeadline(lowPriorityRateLimitIsExceeded, spentTime);
716✔
241
    return res;
716✔
242
}
243

244
void TSerialClientRegisterPoller::OnDeviceConnectionStateChanged(PSerialDevice device)
121✔
245
{
246
    if (device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED) {
121✔
247
        DisconnectedDevicesWaitingForReschedule.push_back(device);
17✔
248
    }
249
}
121✔
250

251
bool TPollableDeviceComparePredicate::operator()(const PPollableDevice& d1, const PPollableDevice& d2) const
14✔
252
{
253
    if (d1->GetDevice() != d2->GetDevice()) {
14✔
254
        return d1->GetDevice()->DeviceConfig()->SlaveId > d2->GetDevice()->DeviceConfig()->SlaveId;
14✔
255
    }
256
    return false;
×
257
}
258

259
TThrottlingStateLogger::TThrottlingStateLogger(): FirstTime(true)
92✔
260
{}
92✔
261

262
std::string TThrottlingStateLogger::GetMessage()
×
263
{
264
    if (FirstTime) {
×
265
        FirstTime = false;
×
266
        return "Register read rate limit is exceeded";
×
267
    }
268
    return std::string();
×
269
}
270

271
void TSerialClientRegisterPoller::RescheduleDisconnectedDevices()
757✔
272
{
273
    for (auto& device: DisconnectedDevicesWaitingForReschedule) {
774✔
274
        auto range = Devices.equal_range(device);
17✔
275
        for (auto it = range.first; it != range.second; ++it) {
35✔
276
            Scheduler.Remove(it->second);
18✔
277
            it->second->RescheduleAllRegisters();
18✔
278
            Scheduler.AddEntry(it->second, it->second->GetDeadline(), it->second->GetPriority());
18✔
279
        }
280
    }
281
    DisconnectedDevicesWaitingForReschedule.clear();
757✔
282
}
757✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc