• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 6

02 Jul 2025 11:08AM UTC coverage: 73.768% (-0.07%) from 73.835%
6

Pull #956

github

u236
update changelog
Pull Request #956: Refactor device/LoadConfig RPC

6427 of 9048 branches covered (71.03%)

0 of 50 new or added lines in 4 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

12278 of 16644 relevant lines covered (73.77%)

306.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.22
/src/serial_client.cpp
1
#include "serial_client.h"
2
#include <chrono>
3
#include <iostream>
4
#include <unistd.h>
5

6
#include "modbus_ext_common.h"
7
#include "write_channel_serial_client_task.h"
8

9
using namespace std::chrono_literals;
10
using namespace std::chrono;
11

12
#define LOG(logger) logger.Log() << "[serial client] "
13

14
namespace
15
{
16
    const auto PORT_OPEN_ERROR_NOTIFICATION_INTERVAL = 5min;
17
    const auto CLOSED_PORT_CYCLE_TIME = 500ms;
18
    const auto MAX_POLL_TIME = 100ms;
19
    // const auto MAX_FLUSHES_WHEN_POLL_IS_DUE = 20;
20
    const auto BALANCING_THRESHOLD = 500ms;
21
    // const auto MIN_READ_EVENTS_TIME = 25ms;
22
    const size_t MAX_EVENT_READ_ERRORS = 10;
23

24
    std::chrono::milliseconds GetReadEventsPeriod(const TPort& port)
79✔
25
    {
26
        auto sendByteTime = port.GetSendTimeBytes(1);
79✔
27
        // >= 115200
28
        if (sendByteTime < 100us) {
79✔
29
            return 50ms;
2✔
30
        }
31
        // >= 38400
32
        if (sendByteTime < 300us) {
77✔
33
            return 100ms;
×
34
        }
35
        // < 38400
36
        return 200ms;
77✔
37
    }
38
};
39

40
TSerialClient::TSerialClient(PPort port,
98✔
41
                             const TPortOpenCloseLogic::TSettings& openCloseSettings,
42
                             util::TGetNowFn nowFn,
43
                             size_t lowPriorityRateLimit)
98✔
44
    : Port(port),
45
      OpenCloseLogic(openCloseSettings, nowFn),
46
      ConnectLogger(PORT_OPEN_ERROR_NOTIFICATION_INTERVAL, "[serial client] "),
47
      NowFn(nowFn),
48
      LowPriorityRateLimit(lowPriorityRateLimit)
98✔
49
{}
98✔
50

51
TSerialClient::~TSerialClient()
98✔
52
{
53
    if (Port->IsOpen()) {
98✔
54
        Port->Close();
52✔
55
    }
56
}
98✔
57

58
void TSerialClient::AddDevice(PSerialDevice device)
84✔
59
{
60
    if (RegReader)
84✔
61
        throw TSerialDeviceException("can't add registers to the active client");
×
62
    for (const auto& reg: device->GetRegisters()) {
610✔
63
        if (Handlers.find(reg) != Handlers.end())
526✔
64
            throw TSerialDeviceException("duplicate register");
×
65
        auto handler = Handlers[reg] = std::make_shared<TRegisterHandler>(reg);
526✔
66
        RegList.push_back(reg);
526✔
67
        LOG(Debug) << "AddRegister: " << reg;
526✔
68
    }
69
    Devices.push_back(device);
84✔
70
}
84✔
71

72
void TSerialClient::Activate()
620✔
73
{
74
    if (!RegReader) {
620✔
75
        RegReader = std::make_unique<TSerialClientRegisterAndEventsReader>(Devices,
79✔
76
                                                                           GetReadEventsPeriod(*Port),
79✔
77
                                                                           NowFn,
79✔
78
                                                                           LowPriorityRateLimit);
79✔
79
        LastAccessedDevice = std::make_unique<TSerialClientDeviceAccessHandler>(RegReader->GetEventsReader());
79✔
80
    }
81
}
620✔
82

83
void TSerialClient::Connect()
×
84
{
85
    OpenCloseLogic.OpenIfAllowed(Port);
×
86
}
87

88
void TSerialClient::WaitForPollAndFlush(steady_clock::time_point currentTime, steady_clock::time_point waitUntil)
620✔
89
{
90
    if (currentTime > waitUntil) {
620✔
91
        waitUntil = currentTime;
616✔
92
    }
93

94
    if (Debug.IsEnabled()) {
620✔
95
        LOG(Debug) << Port->GetDescription() << duration_cast<milliseconds>(currentTime.time_since_epoch()).count()
×
96
                   << ": Wait until " << duration_cast<milliseconds>(waitUntil.time_since_epoch()).count();
×
97
    }
98

99
    std::vector<PSerialClientTask> retryTasks;
1,240✔
100
    {
101
        std::unique_lock<std::mutex> lock(TasksMutex);
1,240✔
102

103
        while (TasksCv.wait_until(lock, waitUntil, [this]() { return !Tasks.empty(); })) {
1,978✔
104
            std::vector<PSerialClientTask> tasks;
118✔
105
            Tasks.swap(tasks);
59✔
106
            lock.unlock();
59✔
107
            for (auto& task: tasks) {
178✔
108
                if (task->Run(Port, *LastAccessedDevice, Devices) == ISerialClientTask::TRunResult::RETRY) {
119✔
109
                    retryTasks.push_back(task);
8✔
110
                }
111
            }
112
            lock.lock();
59✔
113
        }
114
    }
115
    for (auto& task: retryTasks) {
628✔
116
        AddTask(task);
8✔
117
    }
118
}
620✔
119

120
void TSerialClient::ProcessPolledRegister(PRegister reg)
1,037✔
121
{
122
    if (reg->GetErrorState().test(TRegister::ReadError) || reg->GetErrorState().test(TRegister::WriteError)) {
1,037✔
123
        if (RegisterErrorCallback) {
155✔
124
            RegisterErrorCallback(reg);
155✔
125
        }
126
    } else {
127
        if (RegisterReadCallback) {
882✔
128
            RegisterReadCallback(reg);
882✔
129
        }
130
    }
131
}
1,037✔
132

133
void TSerialClient::Cycle()
620✔
134
{
135
    Activate();
620✔
136

137
    try {
138
        OpenCloseLogic.OpenIfAllowed(Port);
623✔
139
    } catch (const std::exception& e) {
6✔
140
        ConnectLogger.Log(e.what(), Debug, Error);
3✔
141
    }
142

143
    if (Port->IsOpen()) {
620✔
144
        ConnectLogger.DropTimeout();
616✔
145
        OpenPortCycle();
616✔
146
    } else {
147
        ClosedPortCycle();
4✔
148
    }
149
}
620✔
150

151
void TSerialClient::ClosedPortCycle()
4✔
152
{
153
    auto currentTime = NowFn();
4✔
154
    auto waitUntil = currentTime + CLOSED_PORT_CYCLE_TIME;
4✔
155
    WaitForPollAndFlush(currentTime, waitUntil);
4✔
156

157
    RegReader->ClosedPortCycle(waitUntil, [this](PRegister reg) { ProcessPolledRegister(reg); });
12✔
158
}
4✔
159

160
void TSerialClient::SetTextValue(PRegister reg, const std::string& value)
109✔
161
{
162
    auto handler = GetHandler(reg);
218✔
163
    handler->SetTextValue(value);
109✔
164
    auto serialClientTask =
165
        std::make_shared<TWriteChannelSerialClientTask>(handler, RegisterReadCallback, RegisterErrorCallback);
109✔
166
    AddTask(serialClientTask);
109✔
167
}
109✔
168

169
void TSerialClient::SetReadCallback(const TSerialClient::TRegisterCallback& callback)
98✔
170
{
171
    RegisterReadCallback = callback;
98✔
172
}
98✔
173

174
void TSerialClient::SetErrorCallback(const TSerialClient::TRegisterCallback& callback)
98✔
175
{
176
    RegisterErrorCallback = callback;
98✔
177
}
98✔
178

179
PRegisterHandler TSerialClient::GetHandler(PRegister reg) const
109✔
180
{
181
    auto it = Handlers.find(reg);
109✔
182
    if (it == Handlers.end())
109✔
183
        throw TSerialDeviceException("register not found");
×
184
    return it->second;
218✔
185
}
186

187
void TSerialClient::OpenPortCycle()
616✔
188
{
189
    auto currentTime = NowFn();
616✔
190
    auto waitUntil = RegReader->GetDeadline(currentTime);
616✔
191
    // Limit waiting time to be responsive
192
    waitUntil = std::min(waitUntil, currentTime + MAX_POLL_TIME);
616✔
193
    WaitForPollAndFlush(currentTime, waitUntil);
616✔
194

195
    auto device = RegReader->OpenPortCycle(
196
        *Port,
616✔
197
        [this](PRegister reg) { ProcessPolledRegister(reg); },
1,029✔
198
        *LastAccessedDevice);
1,848✔
199

200
    if (device) {
616✔
201
        OpenCloseLogic.CloseIfNeeded(Port, device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED);
616✔
202
    }
203
}
616✔
204

205
PPort TSerialClient::GetPort()
×
206
{
207
    return Port;
×
208
}
209

NEW
210
std::list<PSerialDevice> TSerialClient::GetDevices()
×
211
{
NEW
212
    return Devices;
×
213
}
214

215
void TSerialClient::AddTask(PSerialClientTask task)
120✔
216
{
217
    {
218
        std::unique_lock<std::mutex> lock(TasksMutex);
240✔
219
        Tasks.push_back(task);
120✔
220
    }
221
    TasksCv.notify_all();
120✔
222
}
120✔
223

224
TSerialClientRegisterAndEventsReader::TSerialClientRegisterAndEventsReader(const std::list<PSerialDevice>& devices,
92✔
225
                                                                           std::chrono::milliseconds readEventsPeriod,
226
                                                                           util::TGetNowFn nowFn,
227
                                                                           size_t lowPriorityRateLimit)
92✔
228
    : EventsReader(std::make_shared<TSerialClientEventsReader>(MAX_EVENT_READ_ERRORS)),
229
      RegisterPoller(lowPriorityRateLimit),
230
      TimeBalancer(BALANCING_THRESHOLD),
231
      ReadEventsPeriod(readEventsPeriod),
232
      SpentTime(nowFn),
233
      LastCycleWasTooSmallToPoll(false),
234
      NowFn(nowFn)
92✔
235
{
236
    auto currentTime = NowFn();
92✔
237
    RegisterPoller.SetDevices(devices, currentTime);
92✔
238
    EventsReader->SetDevices(devices);
92✔
239
    TimeBalancer.AddEntry(TClientTaskType::POLLING, currentTime, TPriority::Low);
92✔
240
}
92✔
241

242
void TSerialClientRegisterAndEventsReader::ClosedPortCycle(std::chrono::steady_clock::time_point currentTime,
4✔
243
                                                           TRegisterCallback regCallback)
244
{
245
    EventsReader->SetReadErrors(regCallback);
4✔
246
    RegisterPoller.ClosedPortCycle(currentTime, regCallback);
4✔
247
}
4✔
248

249
class TSerialClientTaskHandler
250
{
251
public:
252
    TClientTaskType TaskType;
253
    TItemAccumulationPolicy Policy;
254
    milliseconds PollLimit;
255
    bool NotReady = true;
256

257
    bool operator()(TClientTaskType task, TItemAccumulationPolicy policy, milliseconds pollLimit)
830✔
258
    {
259
        PollLimit = pollLimit;
830✔
260
        TaskType = task;
830✔
261
        Policy = policy;
830✔
262
        NotReady = false;
830✔
263
        return true;
830✔
264
    }
265
};
266

267
PSerialDevice TSerialClientRegisterAndEventsReader::OpenPortCycle(TPort& port,
830✔
268
                                                                  TRegisterCallback regCallback,
269
                                                                  TSerialClientDeviceAccessHandler& lastAccessedDevice)
270
{
271
    // Count idle time as high priority task time to faster reach time balancing threshold
272
    if (LastCycleWasTooSmallToPoll) {
830✔
273
        TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::High);
58✔
274
    }
275

276
    SpentTime.Start();
830✔
277
    TSerialClientTaskHandler handler;
830✔
278
    TimeBalancer.AccumulateNext(SpentTime.GetStartTime(), handler, TItemSelectionPolicy::All);
830✔
279
    if (handler.NotReady) {
830✔
280
        return nullptr;
×
281
    }
282

283
    if (handler.TaskType == TClientTaskType::EVENTS) {
830✔
284
        if (EventsReader->HasDevicesWithEnabledEvents()) {
77✔
285
            lastAccessedDevice.PrepareToAccess(nullptr);
75✔
286
            EventsReader->ReadEvents(port, MAX_POLL_TIME, regCallback, NowFn);
75✔
287
            TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::High);
75✔
288
            TimeBalancer.AddEntry(TClientTaskType::EVENTS,
75✔
289
                                  SpentTime.GetStartTime() + ReadEventsPeriod,
150✔
290
                                  TPriority::High);
291
        }
292
        SpentTime.Start();
77✔
293

294
        // TODO: Need to notify port open/close logic about errors
295
        return nullptr;
77✔
296
    }
297

298
    // Some registers can have theoretical read time more than poll limit.
299
    // Define special cases when reading can exceed poll limit to read the registers:
300
    // 1. TimeBalancer can force reading of such registers.
301
    // 2. If there are not devices with enabled events, the only limiting timeout is MAX_POLL_TIME.
302
    //    We can miss it and read at least one register.
303
    const bool readAtLeastOneRegister =
304
        (handler.Policy == TItemAccumulationPolicy::Force) || !EventsReader->HasDevicesWithEnabledEvents();
753✔
305

306
    auto res = RegisterPoller.OpenPortCycle(port,
307
                                            SpentTime,
753✔
308
                                            std::min(handler.PollLimit, MAX_POLL_TIME),
1,506✔
309
                                            readAtLeastOneRegister,
310
                                            lastAccessedDevice,
311
                                            regCallback);
1,506✔
312

313
    TimeBalancer.AddEntry(TClientTaskType::POLLING, res.Deadline, TPriority::Low);
753✔
314
    if (res.NotEnoughTime) {
753✔
315
        LastCycleWasTooSmallToPoll = true;
37✔
316
    } else {
317
        LastCycleWasTooSmallToPoll = false;
716✔
318
        TimeBalancer.UpdateSelectionTime(ceil<milliseconds>(SpentTime.GetSpentTime()), TPriority::Low);
716✔
319
    }
320

321
    if (EventsReader->HasDevicesWithEnabledEvents() && !TimeBalancer.Contains(TClientTaskType::EVENTS)) {
753✔
322
        TimeBalancer.AddEntry(TClientTaskType::EVENTS, SpentTime.GetStartTime() + ReadEventsPeriod, TPriority::High);
10✔
323
    }
324

325
    SpentTime.Start();
753✔
326
    return res.Device;
753✔
327
}
328

329
std::chrono::steady_clock::time_point TSerialClientRegisterAndEventsReader::GetDeadline(
830✔
330
    std::chrono::steady_clock::time_point currentTime) const
331
{
332
    return TimeBalancer.GetDeadline();
830✔
333
}
334

335
PSerialClientEventsReader TSerialClientRegisterAndEventsReader::GetEventsReader() const
92✔
336
{
337
    return EventsReader;
92✔
338
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc