• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 2

29 Dec 2025 12:28PM UTC coverage: 76.817% (+4.0%) from 72.836%
2

Pull #1045

github

54aa0c
pgasheev
up changelog
Pull Request #1045: Fix firmware version in WB-M1W2 template

6873 of 9161 branches covered (75.02%)

12966 of 16879 relevant lines covered (76.82%)

1651.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.56
/src/rpc/rpc_device_handler.cpp
1
#include "rpc_device_handler.h"
2
#include "rpc_device_load_config_task.h"
3
#include "rpc_device_load_task.h"
4
#include "rpc_device_probe_task.h"
5
#include "rpc_device_set_task.h"
6
#include "rpc_helpers.h"
7

8
#define LOG(logger) ::logger.Log() << "[RPC] "
9

10
void TRPCDeviceParametersCache::RegisterCallbacks(PHandlerConfig handlerConfig)
×
11
{
12
    for (const auto& portConfig: handlerConfig->PortConfigs) {
×
13
        for (const auto& device: portConfig->Devices) {
×
14
            std::string id = GetId(*portConfig->Port, device->Device->DeviceConfig()->SlaveId);
×
15
            device->Device->AddOnConnectionStateChangedCallback([this, id](PSerialDevice device) {
×
16
                if (device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED) {
×
17
                    Remove(id);
×
18
                }
19
            });
×
20
        }
21
    }
22
}
23

24
std::string TRPCDeviceParametersCache::GetId(const TPort& port, const std::string& slaveId) const
×
25
{
26
    return port.GetDescription(false) + ":" + slaveId;
×
27
}
28

29
void TRPCDeviceParametersCache::Add(const std::string& id, const Json::Value& value)
×
30
{
31
    std::unique_lock lock(Mutex);
×
32
    DeviceParameters[id] = value;
×
33
}
34

35
void TRPCDeviceParametersCache::Remove(const std::string& id)
×
36
{
37
    std::unique_lock lock(Mutex);
×
38
    DeviceParameters.erase(id);
×
39
}
40

41
bool TRPCDeviceParametersCache::Contains(const std::string& id) const
×
42
{
43
    std::unique_lock lock(Mutex);
×
44
    return DeviceParameters.find(id) != DeviceParameters.end();
×
45
}
46

47
const Json::Value& TRPCDeviceParametersCache::Get(const std::string& id, const Json::Value& defaultValue) const
×
48
{
49
    std::unique_lock lock(Mutex);
×
50
    auto it = DeviceParameters.find(id);
×
51
    return it != DeviceParameters.end() ? it->second : defaultValue;
×
52
};
53

54
TRPCDeviceHelper::TRPCDeviceHelper(const Json::Value& request,
×
55
                                   const TSerialDeviceFactory& deviceFactory,
56
                                   PTemplateMap templates,
57
                                   TSerialClientTaskRunner& serialClientTaskRunner)
×
58
{
59
    auto params = serialClientTaskRunner.GetSerialClientParams(request);
×
60
    if (params.Device == nullptr) {
×
61
        DeviceTemplate = templates->GetTemplate(request["device_type"].asString());
×
62
        auto protocolName = DeviceTemplate->GetProtocol();
×
63
        if (protocolName == "modbus" && request["modbus_mode"].asString() == "TCP") {
×
64
            protocolName += "-tcp";
×
65
        }
66
        ProtocolParams = deviceFactory.GetProtocolParams(protocolName);
×
67
        auto config = std::make_shared<TDeviceConfig>("RPC Device", request["slave_id"].asString(), protocolName);
×
68
        if (ProtocolParams.protocol->IsModbus()) {
×
69
            config->MaxRegHole = Modbus::MAX_HOLE_CONTINUOUS_16_BIT_REGISTERS;
×
70
            config->MaxBitHole = Modbus::MAX_HOLE_CONTINUOUS_1_BIT_REGISTERS;
×
71
            config->MaxReadRegisters = Modbus::MAX_READ_REGISTERS;
×
72
        }
73
        Device = ProtocolParams.factory->CreateDevice(DeviceTemplate->GetTemplate(), config, ProtocolParams.protocol);
×
74
    } else {
75
        Device = params.Device;
×
76
        DeviceTemplate = templates->GetTemplate(Device->DeviceConfig()->DeviceType);
×
77
        ProtocolParams = deviceFactory.GetProtocolParams(Device->Protocol()->GetName());
×
78
        DeviceFromConfig = true;
×
79
    }
80
    if (DeviceTemplate->WithSubdevices()) {
×
81
        throw TRPCException("Device \"" + DeviceTemplate->Type + "\" is not supported by this RPC",
×
82
                            TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
83
    }
84
}
85

86
TRPCDeviceRequest::TRPCDeviceRequest(const TDeviceProtocolParams& protocolParams,
×
87
                                     PSerialDevice device,
88
                                     PDeviceTemplate deviceTemplate,
89
                                     bool deviceFromConfig)
×
90
    : ProtocolParams(protocolParams),
91
      Device(device),
92
      DeviceTemplate(deviceTemplate),
93
      DeviceFromConfig(deviceFromConfig)
×
94
{
95
    Json::Value responseTimeout = DeviceTemplate->GetTemplate()["response_timeout_ms"];
×
96
    if (responseTimeout.isInt()) {
×
97
        ResponseTimeout = std::chrono::milliseconds(responseTimeout.asInt());
×
98
    }
99

100
    Json::Value frameTimeout = DeviceTemplate->GetTemplate()["frame_timeout_ms"];
×
101
    if (frameTimeout.isInt()) {
×
102
        FrameTimeout = std::chrono::milliseconds(frameTimeout.asInt());
×
103
    }
104
}
105

106
void TRPCDeviceRequest::ParseSettings(const Json::Value& request,
×
107
                                      WBMQTT::TMqttRpcServer::TResultCallback onResult,
108
                                      WBMQTT::TMqttRpcServer::TErrorCallback onError)
109
{
110
    SerialPortSettings = ParseRPCSerialPortSettings(request);
×
111
    WBMQTT::JSON::Get(request, "response_timeout", ResponseTimeout);
×
112
    WBMQTT::JSON::Get(request, "frame_timeout", FrameTimeout);
×
113
    WBMQTT::JSON::Get(request, "total_timeout", TotalTimeout);
×
114
    OnResult = onResult;
×
115
    OnError = onError;
×
116
}
117

118
TRPCDeviceHandler::TRPCDeviceHandler(const std::string& requestDeviceLoadConfigSchemaFilePath,
×
119
                                     const std::string& requestDeviceLoadSchemaFilePath,
120
                                     const std::string& requestDeviceSetSchemaFilePath,
121
                                     const std::string& requestDeviceProbeSchemaFilePath,
122
                                     const std::string& requestDeviceSetPollSchemaFilePath,
123
                                     const TSerialDeviceFactory& deviceFactory,
124
                                     PTemplateMap templates,
125
                                     TSerialClientTaskRunner& serialClientTaskRunner,
126
                                     TRPCDeviceParametersCache& parametersCache,
127
                                     WBMQTT::PMqttRpcServer rpcServer)
×
128
    : DeviceFactory(deviceFactory),
129
      RequestDeviceLoadConfigSchema(LoadRPCRequestSchema(requestDeviceLoadConfigSchemaFilePath, "device/LoadConfig")),
×
130
      RequestDeviceLoadSchema(LoadRPCRequestSchema(requestDeviceLoadSchemaFilePath, "device/Load")),
×
131
      RequestDeviceSetSchema(LoadRPCRequestSchema(requestDeviceSetSchemaFilePath, "device/Set")),
×
132
      RequestDeviceProbeSchema(LoadRPCRequestSchema(requestDeviceProbeSchemaFilePath, "device/Probe")),
×
133
      RequestDeviceSetPollSchema(LoadRPCRequestSchema(requestDeviceSetPollSchemaFilePath, "device/SetPoll")),
×
134
      Templates(templates),
135
      SerialClientTaskRunner(serialClientTaskRunner),
136
      ParametersCache(parametersCache)
×
137
{
138
    rpcServer->RegisterAsyncMethod("device",
×
139
                                   "LoadConfig",
140
                                   std::bind(&TRPCDeviceHandler::LoadConfig,
×
141
                                             this,
×
142
                                             std::placeholders::_1,
143
                                             std::placeholders::_2,
144
                                             std::placeholders::_3));
×
145
    rpcServer->RegisterAsyncMethod("device",
×
146
                                   "Load",
147
                                   std::bind(&TRPCDeviceHandler::Load, //
×
148
                                             this,
×
149
                                             std::placeholders::_1,
150
                                             std::placeholders::_2,
151
                                             std::placeholders::_3));
×
152
    rpcServer->RegisterAsyncMethod("device",
×
153
                                   "Set",
154
                                   std::bind(&TRPCDeviceHandler::Set, //
×
155
                                             this,
×
156
                                             std::placeholders::_1,
157
                                             std::placeholders::_2,
158
                                             std::placeholders::_3));
×
159
    rpcServer->RegisterAsyncMethod("device",
×
160
                                   "Probe",
161
                                   std::bind(&TRPCDeviceHandler::Probe,
×
162
                                             this,
×
163
                                             std::placeholders::_1,
164
                                             std::placeholders::_2,
165
                                             std::placeholders::_3));
×
166

167
    rpcServer->RegisterMethod("device", "SetPoll", std::bind(&TRPCDeviceHandler::SetPoll, this, std::placeholders::_1));
×
168
}
169

170
void TRPCDeviceHandler::LoadConfig(const Json::Value& request,
×
171
                                   WBMQTT::TMqttRpcServer::TResultCallback onResult,
172
                                   WBMQTT::TMqttRpcServer::TErrorCallback onError)
173
{
174
    ValidateRPCRequest(request, RequestDeviceLoadConfigSchema);
×
175
    try {
176
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
177
        auto rpcRequest = ParseRPCDeviceLoadConfigRequest(request,
178
                                                          helper.ProtocolParams,
179
                                                          helper.Device,
180
                                                          helper.DeviceTemplate,
181
                                                          helper.DeviceFromConfig,
182
                                                          ParametersCache,
183
                                                          onResult,
184
                                                          onError);
×
185
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceLoadConfigSerialClientTask>(rpcRequest));
×
186
    } catch (const TRPCException& e) {
×
187
        ProcessException(e, onError);
×
188
    }
189
}
190

191
void TRPCDeviceHandler::Load(const Json::Value& request,
×
192
                             WBMQTT::TMqttRpcServer::TResultCallback onResult,
193
                             WBMQTT::TMqttRpcServer::TErrorCallback onError)
194
{
195
    ValidateRPCRequest(request, RequestDeviceLoadSchema);
×
196
    try {
197
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
198
        auto rpcRequest = ParseRPCDeviceLoadRequest(request,
199
                                                    helper.ProtocolParams,
200
                                                    helper.Device,
201
                                                    helper.DeviceTemplate,
202
                                                    helper.DeviceFromConfig,
203
                                                    onResult,
204
                                                    onError);
×
205
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceLoadSerialClientTask>(rpcRequest));
×
206
    } catch (const TRPCException& e) {
×
207
        ProcessException(e, onError);
×
208
    }
209
}
210

211
void TRPCDeviceHandler::Set(const Json::Value& request,
×
212
                            WBMQTT::TMqttRpcServer::TResultCallback onResult,
213
                            WBMQTT::TMqttRpcServer::TErrorCallback onError)
214
{
215
    ValidateRPCRequest(request, RequestDeviceSetSchema);
×
216
    try {
217
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
218
        auto rpcRequest = ParseRPCDeviceSetRequest(request,
219
                                                   helper.ProtocolParams,
220
                                                   helper.Device,
221
                                                   helper.DeviceTemplate,
222
                                                   helper.DeviceFromConfig,
223
                                                   onResult,
224
                                                   onError);
×
225
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceSetSerialClientTask>(rpcRequest));
×
226
    } catch (const TRPCException& e) {
×
227
        ProcessException(e, onError);
×
228
    }
229
}
230

231
void TRPCDeviceHandler::Probe(const Json::Value& request,
×
232
                              WBMQTT::TMqttRpcServer::TResultCallback onResult,
233
                              WBMQTT::TMqttRpcServer::TErrorCallback onError)
234
{
235
    ValidateRPCRequest(request, RequestDeviceProbeSchema);
×
236
    try {
237
        SerialClientTaskRunner.RunTask(request,
×
238
                                       std::make_shared<TRPCDeviceProbeSerialClientTask>(request, onResult, onError));
×
239
    } catch (const TRPCException& e) {
×
240
        ProcessException(e, onError);
×
241
    }
242
}
243

244
Json::Value TRPCDeviceHandler::SetPoll(const Json::Value& request)
×
245
{
246
    ValidateRPCRequest(request, RequestDeviceSetPollSchema);
×
247
    auto params = SerialClientTaskRunner.GetSerialClientParams(request);
×
248
    if (!params.SerialClient || !params.Device) {
×
249
        throw TRPCException("Port or device not found", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
250
    }
251
    try {
252
        if (!request["poll"].asBool()) {
×
253
            params.SerialClient->SuspendPoll(params.Device, std::chrono::steady_clock::now());
×
254
        } else {
255
            params.SerialClient->ResumePoll(params.Device);
×
256
        }
257
    } catch (const std::runtime_error& e) {
×
258
        LOG(Warn) << e.what();
×
259
        throw TRPCException(e.what(), TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
260
    }
261
    return Json::Value(Json::objectValue);
×
262
}
263

264
TRPCRegisterList CreateRegisterList(const TDeviceProtocolParams& protocolParams,
8✔
265
                                    const PSerialDevice& device,
266
                                    const Json::Value& templateItems,
267
                                    const Json::Value& knownItems,
268
                                    const std::string& fwVersion,
269
                                    bool checkUnsupported)
270
{
271
    TRPCRegisterList registerList;
8✔
272
    for (auto it = templateItems.begin(); it != templateItems.end(); ++it) {
60✔
273
        const auto& item = *it;
52✔
274
        auto id = templateItems.isObject() ? it.key().asString() : item["id"].asString();
52✔
275
        bool duplicate = false;
52✔
276
        for (const auto& item: registerList) {
134✔
277
            if (item.Id == id) {
84✔
278
                duplicate = true;
2✔
279
                break;
2✔
280
            }
281
        }
282
        if (duplicate || item["address"].isNull() || item["readonly"].asBool() || !knownItems[id].isNull()) {
52✔
283
            continue;
10✔
284
        }
285
        if (!fwVersion.empty()) {
42✔
286
            std::string fw = item["fw"].asString();
24✔
287
            if (!fw.empty() && util::CompareVersionStrings(fw, fwVersion) > 0) {
24✔
288
                continue;
8✔
289
            }
290
        }
291

292
        auto config = LoadRegisterConfig(item,
293
                                         *protocolParams.protocol->GetRegTypes(),
34✔
294
                                         std::string(),
68✔
295
                                         *protocolParams.factory,
34✔
296
                                         protocolParams.factory->GetRegisterAddressFactory().GetBaseRegisterAddress(),
34✔
297
                                         0);
68✔
298
        TRPCRegister reg = {id, std::make_shared<TRegister>(device, config.RegisterConfig), checkUnsupported};
68✔
299
        reg.Register->SetAvailable(TRegisterAvailability::AVAILABLE);
34✔
300

301
        // this code checks enums and ranges only for 16-bit register unsupported value 0xFFFE
302
        // it must be modified to check larger registers like 24, 32 or 64-bits
303
        if (reg.CheckUnsupported) {
34✔
304
            int unsupportedValue =
305
                config.RegisterConfig->Format == S16 ? static_cast<int16_t>(0xFFFE) : static_cast<uint16_t>(0xFFFE);
×
306
            if (item.isMember("enum")) {
×
307
                const auto& list = item["enum"];
×
308
                for (auto it = list.begin(); it != list.end(); ++it) {
×
309
                    if ((*it).asInt() == unsupportedValue) {
×
310
                        reg.CheckUnsupported = false;
×
311
                        break;
×
312
                    }
313
                }
314
            } else {
315
                if (item["min"].asInt() <= unsupportedValue && item["max"].asInt() >= unsupportedValue) {
×
316
                    reg.CheckUnsupported = false;
×
317
                }
318
            }
319
        }
320

321
        registerList.push_back(reg);
34✔
322
    }
323
    return registerList;
8✔
324
}
325

326
void ReadRegisterList(TPort& port,
×
327
                      PSerialDevice device,
328
                      TRPCRegisterList& registerList,
329
                      Json::Value& result,
330
                      int maxRetries)
331
{
332
    if (registerList.size() == 0) {
×
333
        return;
×
334
    }
335

336
    TRegisterComparePredicate compare;
337
    std::sort(registerList.begin(), registerList.end(), [compare](TRPCRegister& a, TRPCRegister& b) {
×
338
        return compare(b.Register, a.Register);
×
339
    });
340

341
    std::string error;
×
342
    for (int i = 0; i <= maxRetries; i++) {
×
343
        try {
344
            device->Prepare(port, TDevicePrepareMode::WITHOUT_SETUP);
×
345
            break;
×
346
        } catch (const TSerialDeviceException& e) {
×
347
            if (i == maxRetries) {
×
348
                error = std::string("Failed to prepare session: ") + e.what();
×
349
                LOG(Warn) << port.GetDescription() << " " << device->ToString() << ": " << error;
×
350
                throw TRPCException(error, TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
351
            }
352
        }
353
    }
354

355
    size_t index = 0;
×
356
    while (index < registerList.size() && error.empty()) {
×
357
        auto first = registerList[index].Register;
×
358
        auto range = device->CreateRegisterRange();
×
359
        while (index < registerList.size() &&
×
360
               range->Add(port, registerList[index].Register, std::chrono::milliseconds::max()))
×
361
        {
362
            ++index;
×
363
        }
364
        for (int i = 0; i <= maxRetries; ++i) {
×
365
            try {
366
                device->ReadRegisterRange(port, range, true);
×
367
                break;
×
368
            } catch (const TSerialDevicePermanentRegisterException& e) {
×
369
                LOG(Warn) << port.GetDescription() << " " << device->ToString() << ": "
×
370
                          << "Failed to read " << std::to_string(range->RegisterList().size())
×
371
                          << " registers starting from <" << first->GetConfig()->ToString() + ">: " + e.what();
×
372
                break;
×
373
            } catch (const TSerialDeviceException& e) {
×
374
                if (i == maxRetries) {
×
375
                    error = "Failed to read " + std::to_string(range->RegisterList().size()) +
×
376
                            " registers starting from <" + first->GetConfig()->ToString() + ">: " + e.what();
×
377
                }
378
            }
379
        }
380
    }
381

382
    try {
383
        device->EndSession(port);
×
384
    } catch (const TSerialDeviceException& e) {
×
385
        LOG(Warn) << port.GetDescription() << " " << device->ToString() << " unable to end session: " << e.what();
×
386
    }
387

388
    if (!error.empty()) {
×
389
        LOG(Warn) << port.GetDescription() << " " << device->ToString() << ": " << error;
×
390
        throw TRPCException(error, TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
391
    }
392

393
    for (size_t i = 0; i < registerList.size(); ++i) {
×
394
        auto& reg = registerList[i];
×
395
        result[reg.Id] = RawValueToJSON(*reg.Register->GetConfig(), reg.Register->GetValue());
×
396
    }
397
}
398

399
Json::Value RawValueToJSON(const TRegisterConfig& reg, TRegisterValue val)
×
400
{
401
    auto str = ConvertFromRawValue(reg, val);
×
402
    try {
403
        return std::stod(str.c_str(), 0);
×
404
    } catch (const std::invalid_argument&) {
×
405
        return str;
×
406
    }
407
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc