• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 2

29 Dec 2025 12:28PM UTC coverage: 76.817% (+4.0%) from 72.836%
2

Pull #1045

github

54aa0c
pgasheev
up changelog
Pull Request #1045: Fix firmware version in WB-M1W2 template

6873 of 9161 branches covered (75.02%)

12966 of 16879 relevant lines covered (76.82%)

1651.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.33
/src/serial_client_events_reader.cpp
1
#include "serial_client_events_reader.h"
2
#include "common_utils.h"
3
#include "log.h"
4
#include "serial_device.h"
5

6
#include <string.h>
7

8
using namespace std::chrono;
9

10
#define LOG(logger) logger.Log() << "[serial client] "
11

12
namespace
13
{
14
    const auto DEFAULT_SPORAIC_ONLY_READ_RATE_LIMIT = std::chrono::milliseconds(500);
15

16
    std::string EventTypeToString(uint8_t eventType)
24✔
17
    {
18
        switch (eventType) {
24✔
19
            case ModbusExt::TEventType::COIL:
×
20
                return "coil";
×
21
            case ModbusExt::TEventType::DISCRETE:
×
22
                return "discrete";
×
23
            case ModbusExt::TEventType::HOLDING:
24✔
24
                return "holding";
24✔
25
            case ModbusExt::TEventType::INPUT:
×
26
                return "input";
×
27
        }
28
        return "unknown";
×
29
    }
30

31
    std::string MakeDeviceDescriptionString(uint8_t slaveId)
80✔
32
    {
33
        return "modbus:" + std::to_string(slaveId);
160✔
34
    }
35

36
    std::string MakeRegisterDescriptionString(uint8_t slaveId, uint8_t eventType, uint16_t eventId)
×
37
    {
38
        return "<" + MakeDeviceDescriptionString(slaveId) + ":" + EventTypeToString(eventType) + ": " +
×
39
               std::to_string(eventId) + ">";
×
40
    }
41

42
    std::string MakeEventDescriptionString(uint8_t slaveId, uint8_t eventType, uint16_t eventId)
50✔
43
    {
44
        if (ModbusExt::IsRegisterEvent(eventType)) {
50✔
45
            return "<" + MakeDeviceDescriptionString(slaveId) + ":" + EventTypeToString(eventType) + ": " +
72✔
46
                   std::to_string(eventId) + ">";
48✔
47
        }
48
        if (eventType == ModbusExt::TEventType::REBOOT) {
26✔
49
            return "<" + MakeDeviceDescriptionString(slaveId) + ": reboot>";
52✔
50
        }
51
        return "unknown event from " + MakeDeviceDescriptionString(slaveId) + " type: " + std::to_string(eventType) +
×
52
               ", id: " + std::to_string(eventId);
×
53
    }
54

55
    ModbusExt::TEventType ToEventRegisterType(const Modbus::RegisterType regType)
20✔
56
    {
57
        switch (regType) {
20✔
58
            case Modbus::REG_COIL:
×
59
                return ModbusExt::TEventType::COIL;
×
60
            case Modbus::REG_DISCRETE:
×
61
                return ModbusExt::TEventType::DISCRETE;
×
62
            case Modbus::REG_HOLDING:
20✔
63
            case Modbus::REG_HOLDING_SINGLE:
64
            case Modbus::REG_HOLDING_MULTI:
65
                return ModbusExt::TEventType::HOLDING;
20✔
66
            case Modbus::REG_INPUT:
×
67
                return ModbusExt::TEventType::INPUT;
×
68
            default:
×
69
                throw std::runtime_error("unsupported register type");
×
70
        }
71
    }
72

73
    TModbusDevice* ToModbusDevice(TSerialDevice* device)
338✔
74
    {
75
        auto dev = dynamic_cast<TModbusDevice*>(device);
338✔
76
        if (dev != nullptr && (dev->Protocol()->GetName() == "modbus" || dev->Protocol()->GetName() == "modbus-tcp")) {
338✔
77
            return dev;
184✔
78
        }
79
        return nullptr;
154✔
80
    }
81

82
    void DisableEventsFromRegs(TFeaturePort& port, const std::list<TEventsReaderRegisterDesc>& regs)
162✔
83
    {
84
        if (regs.empty()) {
162✔
85
            return;
162✔
86
        }
87
        std::unique_ptr<Modbus::IModbusTraits> traits;
×
88
        if (port.IsModbusTcp()) {
×
89
            traits = std::make_unique<Modbus::TModbusTCPTraits>();
×
90
        } else {
91
            traits = std::make_unique<Modbus::TModbusRTUTraits>();
×
92
        }
93
        auto regIt = regs.cbegin();
×
94
        while (regIt != regs.cend()) {
×
95
            uint8_t slaveId = regIt->SlaveId;
×
96
            LOG(Warn) << "Disable unexpected events from " << MakeDeviceDescriptionString(slaveId);
×
97
            ModbusExt::TEventsEnabler enabler(slaveId, port, *traits, [](uint8_t, uint16_t, bool) {});
×
98
            for (; regIt != regs.cend() && slaveId == regIt->SlaveId; ++regIt) {
×
99
                enabler.AddRegister(regIt->Addr,
×
100
                                    static_cast<ModbusExt::TEventType>(regIt->Type),
×
101
                                    ModbusExt::TEventPriority::DISABLE);
102
            }
103
            try {
104
                enabler.SendRequests();
×
105
            } catch (const Modbus::TErrorBase& ex) {
×
106
                LOG(Warn) << "Failed to disable unexpected events: " << ex.what();
×
107
            } catch (const TSerialDeviceException& ex) {
×
108
                LOG(Warn) << "Failed to disable unexpected events: " << ex.what();
×
109
            }
110
        }
111
    }
112

113
    void EnableSporadicOnlyDevicePolling(PSerialDevice device)
26✔
114
    {
115
        if (!device->IsSporadicOnly()) {
26✔
116
            return;
8✔
117
        }
118
        for (auto& reg: device->GetRegisters()) {
20✔
119
            if (reg->IsExcludedFromPolling()) {
18✔
120
                auto config = reg->GetConfig();
32✔
121
                if (!config->ReadPeriod.has_value() && !config->ReadRateLimit.has_value()) {
16✔
122
                    config->ReadRateLimit = DEFAULT_SPORAIC_ONLY_READ_RATE_LIMIT;
10✔
123
                }
124
                reg->IncludeInPolling();
16✔
125
                break;
16✔
126
            }
127
        }
128
    }
129
};
130

131
class TModbusExtEventsVisitor: public ModbusExt::IEventsVisitor
132
{
133
    const TSerialClientEventsReader::TRegsMap& Regs;
134
    std::unordered_set<uint8_t>& DevicesWithEnabledEvents;
135
    TSerialClientEventsReader::TRegisterCallback RegisterChangedCallback;
136
    uint8_t SlaveId = 0;
137
    std::list<TEventsReaderRegisterDesc> RegsToDisable;
138

139
    void ProcessRegisterChangeEvent(uint8_t slaveId,
×
140
                                    uint8_t eventType,
141
                                    uint16_t eventId,
142
                                    const uint8_t* data,
143
                                    size_t dataSize)
144
    {
145
        auto regArray = Regs.find({slaveId, eventId, eventType});
×
146
        if (regArray != Regs.end()) {
×
147
            LOG(Debug) << "Event from " << regArray->second.front()->ToString()
×
148
                       << ", data: " << WBMQTT::HexDump(data, dataSize);
×
149
            uint64_t value = 0;
×
150
            memcpy(&value, data, std::min(dataSize, sizeof(value)));
×
151
            for (auto reg: regArray->second) {
×
152
                reg->SetValue(TRegisterValue(value));
×
153
                RegisterChangedCallback(reg);
×
154
            }
155
        } else {
156
            LOG(Warn) << "Unexpected event from: " << MakeRegisterDescriptionString(slaveId, eventType, eventId);
×
157
            RegsToDisable.emplace_back(TEventsReaderRegisterDesc{slaveId, eventId, eventType});
×
158
        }
159
    }
160

161
    void ProcessDeviceRestartedEvent(uint8_t slaveId)
2✔
162
    {
163
        DevicesWithEnabledEvents.erase(slaveId);
2✔
164
        for (const auto& reg: Regs) {
2✔
165
            if (reg.first.SlaveId == slaveId) {
2✔
166
                LOG(Debug) << "Restart event from " << MakeDeviceDescriptionString(slaveId);
2✔
167
                reg.second.front()->Device()->SetDisconnected();
2✔
168
                return;
2✔
169
            }
170
        }
171
        LOG(Warn) << "Restart event from unknown device " << MakeDeviceDescriptionString(slaveId);
×
172
    }
173

174
public:
175
    TModbusExtEventsVisitor(const TSerialClientEventsReader::TRegsMap& regs,
162✔
176
                            std::unordered_set<uint8_t>& devicesWithEnabledEvents,
177
                            TSerialClientEventsReader::TRegisterCallback registerChangedCallback)
178
        : Regs(regs),
162✔
179
          DevicesWithEnabledEvents(devicesWithEnabledEvents),
180
          RegisterChangedCallback(registerChangedCallback)
162✔
181
    {}
162✔
182

183
    virtual void Event(uint8_t slaveId,
2✔
184
                       uint8_t eventType,
185
                       uint16_t eventId,
186
                       const uint8_t* data,
187
                       size_t dataSize) override
188
    {
189
        SlaveId = slaveId;
2✔
190
        if (ModbusExt::IsRegisterEvent(eventType)) {
2✔
191
            ProcessRegisterChangeEvent(slaveId, eventType, eventId, data, dataSize);
×
192
            return;
×
193
        }
194
        if (eventType == ModbusExt::TEventType::REBOOT) {
2✔
195
            ProcessDeviceRestartedEvent(slaveId);
2✔
196
            return;
2✔
197
        }
198
        LOG(Warn) << "Unexpected event from " << MakeDeviceDescriptionString(slaveId)
×
199
                  << " type: " << static_cast<int>(eventType) << ", id: " << eventId;
×
200
    }
201

202
    uint8_t GetSlaveId() const
2✔
203
    {
204
        return SlaveId;
2✔
205
    }
206

207
    const std::list<TEventsReaderRegisterDesc>& GetRegsToDisable() const
162✔
208
    {
209
        return RegsToDisable;
162✔
210
    }
211
};
212

213
TSerialClientEventsReader::TSerialClientEventsReader(size_t maxReadErrors)
192✔
214
    : LastAccessedSlaveId(0),
215
      ReadErrors(0),
216
      MaxReadErrors(maxReadErrors),
217
      ClearErrorsOnSuccessfulRead(false)
192✔
218
{}
192✔
219

220
void TSerialClientEventsReader::ReadEventsFailed(const std::string& errorMessage, TRegisterCallback registerCallback)
24✔
221
{
222
    LOG(Warn) << "Reading events failed: " << errorMessage;
24✔
223
    ++ReadErrors;
24✔
224
    if (ReadErrors > MaxReadErrors) {
24✔
225
        SetReadErrors(registerCallback);
2✔
226
        ReadErrors = 0;
2✔
227
        ClearErrorsOnSuccessfulRead = true;
2✔
228
    }
229
}
24✔
230

231
void TSerialClientEventsReader::ReadEvents(TFeaturePort& port,
162✔
232
                                           milliseconds maxReadingTime,
233
                                           TRegisterCallback registerCallback,
234
                                           util::TGetNowFn nowFn)
235
{
236
    TModbusExtEventsVisitor visitor(Regs, DevicesWithEnabledEvents, registerCallback);
324✔
237
    util::TSpentTimeMeter spentTimeMeter(nowFn);
324✔
238
    spentTimeMeter.Start();
162✔
239
    std::unique_ptr<Modbus::IModbusTraits> traits;
162✔
240
    if (port.IsModbusTcp()) {
162✔
241
        traits = std::make_unique<Modbus::TModbusTCPTraits>();
×
242
    } else {
243
        traits = std::make_unique<ModbusExt::TModbusRTUWithArbitrationTraits>();
162✔
244
    }
245
    for (auto spentTime = 0us; spentTime < maxReadingTime; spentTime = spentTimeMeter.GetSpentTime()) {
188✔
246
        try {
247
            if (!ModbusExt::ReadEvents(port,
182✔
248
                                       *traits,
182✔
249
                                       floor<milliseconds>(maxReadingTime - spentTime),
364✔
250
                                       LastAccessedSlaveId,
251
                                       EventState,
182✔
252
                                       visitor))
253
            {
254
                LastAccessedSlaveId = 0;
156✔
255
                EventState.Reset();
156✔
256
                ClearReadErrors(registerCallback);
156✔
257
                break;
156✔
258
            }
259
            // TODO: Limit reads from same slaveId
260
            LastAccessedSlaveId = visitor.GetSlaveId();
2✔
261
            ClearReadErrors(registerCallback);
2✔
262
        } catch (const TSerialDeviceException& ex) {
24✔
263
            ReadEventsFailed(ex.what(), registerCallback);
×
264
        } catch (const Modbus::TErrorBase& ex) {
48✔
265
            ReadEventsFailed(ex.what(), registerCallback);
24✔
266
        }
267
    }
268
    DisableEventsFromRegs(port, visitor.GetRegsToDisable());
162✔
269
}
162✔
270

271
void TSerialClientEventsReader::EnableEvents(PSerialDevice device, TFeaturePort& port)
276✔
272
{
273
    auto modbusDevice = ToModbusDevice(device.get());
276✔
274
    if (!modbusDevice) {
276✔
275
        return;
132✔
276
    }
277
    if (!port.SupportsFastModbus()) {
144✔
278
        LOG(Info) << port.GetDescription() << ". Skip enabling events for "
×
279
                  << MakeDeviceDescriptionString(static_cast<uint8_t>(modbusDevice->SlaveId))
×
280
                  << " because Fast Modbus is not supported by gateway";
×
281
        return;
×
282
    }
283
    uint8_t slaveId = static_cast<uint8_t>(modbusDevice->SlaveId);
144✔
284
    DevicesWithEnabledEvents.erase(slaveId);
144✔
285
    std::unique_ptr<Modbus::IModbusTraits> traits;
144✔
286
    if (port.IsModbusTcp()) {
144✔
287
        traits = std::make_unique<Modbus::TModbusTCPTraits>();
×
288
    } else {
289
        traits = std::make_unique<Modbus::TModbusRTUTraits>();
144✔
290
    }
291
    ModbusExt::TEventsEnabler ev(slaveId,
292
                                 port,
293
                                 *traits,
144✔
294
                                 std::bind(&TSerialClientEventsReader::OnEnabledEvent,
×
295
                                           this,
×
296
                                           slaveId,
297
                                           std::placeholders::_1,
298
                                           std::placeholders::_2,
299
                                           std::placeholders::_3),
144✔
300
                                 ModbusExt::TEventsEnabler::DISABLE_EVENTS_IN_HOLES);
432✔
301

302
    try {
303
        for (const auto& regArray: Regs) {
172✔
304
            if (regArray.first.SlaveId == slaveId) {
28✔
305
                ev.AddRegister(regArray.first.Addr,
28✔
306
                               static_cast<ModbusExt::TEventType>(regArray.first.Type),
28✔
307
                               regArray.second.front()->GetConfig()->IsHighPriority() ? ModbusExt::TEventPriority::HIGH
56✔
308
                                                                                      : ModbusExt::TEventPriority::LOW);
309
            }
310
        }
311
        if (ev.HasEventsToSetup()) {
144✔
312
            ev.AddRegister(0, ModbusExt::TEventType::REBOOT, ModbusExt::TEventPriority::DISABLE);
28✔
313
            LOG(Debug) << "Try to enable events for " << MakeDeviceDescriptionString(slaveId);
28✔
314
            ev.SendRequests();
28✔
315
            EnableSporadicOnlyDevicePolling(device);
26✔
316
        }
317
    } catch (const Modbus::TModbusExceptionError& e) {
2✔
318
        LOG(Warn) << "Failed to enable events for " << MakeDeviceDescriptionString(slaveId) << ": " << e.what();
×
319
    } catch (const TResponseTimeoutException& e) {
×
320
        LOG(Warn) << "Failed to enable events for " << MakeDeviceDescriptionString(slaveId) << ": " << e.what();
×
321
    } catch (const Modbus::TErrorBase& e) {
4✔
322
        throw TSerialDeviceTransientErrorException(std::string("Failed to enable events: ") + e.what());
2✔
323
    }
324
}
325

326
void TSerialClientEventsReader::OnEnabledEvent(uint8_t slaveId, uint8_t type, uint16_t addr, bool res)
52✔
327
{
328
    auto regArray = Regs.find({slaveId, addr, type});
52✔
329
    if (regArray != Regs.end()) {
52✔
330
        for (const auto& reg: regArray->second) {
52✔
331
            if (res) {
26✔
332
                if (reg->GetConfig()->SporadicMode == TRegisterConfig::TSporadicMode::ONLY_EVENTS) {
24✔
333
                    reg->ExcludeFromPolling();
22✔
334
                }
335
                reg->SetAvailable(TRegisterAvailability::AVAILABLE);
24✔
336
                DevicesWithEnabledEvents.emplace(slaveId);
24✔
337
            } else {
338
                reg->IncludeInPolling();
2✔
339
            }
340
        }
341
    }
342
    if (res) {
52✔
343
        LOG(Info) << "Events are enabled for " << MakeEventDescriptionString(slaveId, type, addr);
24✔
344
        return;
24✔
345
    }
346
    if (!ModbusExt::IsRegisterEvent(type)) {
28✔
347
        LOG(Info) << "Events are disabled for " << MakeEventDescriptionString(slaveId, type, addr);
26✔
348
    }
349
}
350

351
void TSerialClientEventsReader::SetDevices(const std::list<PSerialDevice>& devices)
192✔
352
{
353
    for (const auto& dev: devices) {
394✔
354
        for (const auto& reg: dev->GetRegisters()) {
1,312✔
355
            if (reg->GetConfig()->SporadicMode != TRegisterConfig::TSporadicMode::DISABLED) {
1,110✔
356
                auto dev = ToModbusDevice(reg->Device().get());
20✔
357
                if (dev != nullptr) {
20✔
358
                    TEventsReaderRegisterDesc regDesc{
359
                        static_cast<uint8_t>(dev->SlaveId),
20✔
360
                        static_cast<uint16_t>(GetUint32RegisterAddress(reg->GetConfig()->GetAddress())),
20✔
361
                        ToEventRegisterType(static_cast<Modbus::RegisterType>(reg->GetConfig()->Type))};
40✔
362
                    Regs[regDesc].push_back(reg);
20✔
363
                }
364
            }
365
        }
366
        dev->AddOnConnectionStateChangedCallback(
404✔
367
            [this](PSerialDevice device) { OnDeviceConnectionStateChanged(device); });
670✔
368
    }
369
}
192✔
370

371
void TSerialClientEventsReader::OnDeviceConnectionStateChanged(PSerialDevice device)
266✔
372
{
373
    if (device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED) {
266✔
374
        auto dev = ToModbusDevice(device.get());
42✔
375
        if (dev != nullptr) {
42✔
376
            DevicesWithEnabledEvents.erase(dev->SlaveId);
20✔
377
        }
378
    }
379
}
266✔
380

381
void TSerialClientEventsReader::SetReadErrors(TRegisterCallback callback)
10✔
382
{
383
    for (const auto& regArray: Regs) {
12✔
384
        for (const auto& reg: regArray.second) {
4✔
385
            if (reg->IsExcludedFromPolling() || reg->Device()->IsSporadicOnly()) {
2✔
386
                reg->SetError(TRegister::TError::ReadError);
2✔
387
                if (callback) {
2✔
388
                    callback(reg);
2✔
389
                }
390
            }
391
        }
392
    }
393
}
10✔
394

395
void TSerialClientEventsReader::ClearReadErrors(TRegisterCallback callback)
158✔
396
{
397
    ReadErrors = 0;
158✔
398
    if (!ClearErrorsOnSuccessfulRead) {
158✔
399
        return;
158✔
400
    }
401
    ClearErrorsOnSuccessfulRead = false;
×
402
    for (const auto& regArray: Regs) {
×
403
        for (const auto& reg: regArray.second) {
×
404
            if (reg->IsExcludedFromPolling() || reg->Device()->IsSporadicOnly()) {
×
405
                reg->ClearError(TRegister::TError::ReadError);
×
406
                if (callback) {
×
407
                    callback(reg);
×
408
                }
409
            }
410
        }
411
    }
412
}
413

414
bool TSerialClientEventsReader::HasDevicesWithEnabledEvents() const
1,892✔
415
{
416
    return !DevicesWithEnabledEvents.empty();
1,892✔
417
}
418

419
bool TEventsReaderRegisterDesc::operator==(const TEventsReaderRegisterDesc& other) const
26✔
420
{
421
    return SlaveId == other.SlaveId && Addr == other.Addr && Type == other.Type;
26✔
422
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc