• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 4

13 Oct 2025 05:12PM UTC coverage: 73.348%. Remained the same
4

push

github

web-flow
Add WASM branch

6750 of 9574 branches covered (70.5%)

8 of 8 new or added lines in 2 files covered. (100.0%)

2 existing lines in 1 file now uncovered.

12797 of 17447 relevant lines covered (73.35%)

410.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.3
/src/rpc/rpc_device_handler.cpp
1
#include "rpc_device_handler.h"
2
#include "rpc_device_load_config_task.h"
3
#include "rpc_device_load_task.h"
4
#include "rpc_device_probe_task.h"
5
#include "rpc_device_set_task.h"
6
#include "rpc_helpers.h"
7

8
#define LOG(logger) ::logger.Log() << "[RPC] "
9

10
void TRPCDeviceParametersCache::RegisterCallbacks(PHandlerConfig handlerConfig)
×
11
{
12
    for (const auto& portConfig: handlerConfig->PortConfigs) {
×
13
        for (const auto& device: portConfig->Devices) {
×
14
            std::string id = GetId(*portConfig->Port, device->Device->DeviceConfig()->SlaveId);
×
15
            device->Device->AddOnConnectionStateChangedCallback([this, id](PSerialDevice device) {
×
16
                if (device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED) {
×
17
                    Remove(id);
×
18
                }
19
            });
×
20
        }
21
    }
22
}
23

24
std::string TRPCDeviceParametersCache::GetId(const TPort& port, const std::string& slaveId) const
×
25
{
26
    return port.GetDescription(false) + ":" + slaveId;
×
27
}
28

29
void TRPCDeviceParametersCache::Add(const std::string& id, const Json::Value& value)
×
30
{
31
    std::unique_lock lock(Mutex);
×
32
    DeviceParameters[id] = value;
×
33
}
34

35
void TRPCDeviceParametersCache::Remove(const std::string& id)
×
36
{
37
    std::unique_lock lock(Mutex);
×
38
    DeviceParameters.erase(id);
×
39
}
40

41
bool TRPCDeviceParametersCache::Contains(const std::string& id) const
×
42
{
43
    std::unique_lock lock(Mutex);
×
44
    return DeviceParameters.find(id) != DeviceParameters.end();
×
45
}
46

47
const Json::Value& TRPCDeviceParametersCache::Get(const std::string& id, const Json::Value& defaultValue) const
×
48
{
49
    std::unique_lock lock(Mutex);
×
50
    auto it = DeviceParameters.find(id);
×
51
    return it != DeviceParameters.end() ? it->second : defaultValue;
×
52
};
53

54
#ifndef __EMSCRIPTEN__
UNCOV
55
TRPCDeviceHelper::TRPCDeviceHelper(const Json::Value& request,
×
56
                                   const TSerialDeviceFactory& deviceFactory,
57
                                   PTemplateMap templates,
58
                                   TSerialClientTaskRunner& serialClientTaskRunner)
×
59
{
60
    auto params = serialClientTaskRunner.GetSerialClientParams(request);
×
61
    if (params.Device == nullptr) {
×
62
        DeviceTemplate = templates->GetTemplate(request["device_type"].asString());
×
63
        auto protocolName = DeviceTemplate->GetProtocol();
×
64
        if (protocolName == "modbus" && request["modbus_mode"].asString() == "TCP") {
×
65
            protocolName += "-tcp";
×
66
        }
67
        ProtocolParams = deviceFactory.GetProtocolParams(protocolName);
×
68
        auto config = std::make_shared<TDeviceConfig>("RPC Device", request["slave_id"].asString(), protocolName);
×
69
        if (ProtocolParams.protocol->IsModbus()) {
×
70
            config->MaxRegHole = Modbus::MAX_HOLE_CONTINUOUS_16_BIT_REGISTERS;
×
71
            config->MaxBitHole = Modbus::MAX_HOLE_CONTINUOUS_1_BIT_REGISTERS;
×
72
            config->MaxReadRegisters = Modbus::MAX_READ_REGISTERS;
×
73
        }
74
        Device = ProtocolParams.factory->CreateDevice(DeviceTemplate->GetTemplate(), config, ProtocolParams.protocol);
×
75
    } else {
76
        Device = params.Device;
×
77
        DeviceTemplate = templates->GetTemplate(Device->DeviceConfig()->DeviceType);
×
78
        ProtocolParams = deviceFactory.GetProtocolParams(DeviceTemplate->GetProtocol());
×
79
        DeviceFromConfig = true;
×
80
    }
81
    if (DeviceTemplate->WithSubdevices()) {
×
82
        throw TRPCException("Device \"" + DeviceTemplate->Type + "\" is not supported by this RPC",
×
83
                            TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
84
    }
85
}
86
#endif
87

88
TRPCDeviceRequest::TRPCDeviceRequest(const TDeviceProtocolParams& protocolParams,
×
89
                                     PSerialDevice device,
90
                                     PDeviceTemplate deviceTemplate,
91
                                     bool deviceFromConfig)
×
92
    : ProtocolParams(protocolParams),
93
      Device(device),
94
      DeviceTemplate(deviceTemplate),
95
      DeviceFromConfig(deviceFromConfig)
×
96
{
97
    Json::Value responseTimeout = DeviceTemplate->GetTemplate()["response_timeout_ms"];
×
98
    if (responseTimeout.isInt()) {
×
99
        ResponseTimeout = std::chrono::milliseconds(responseTimeout.asInt());
×
100
    }
101

102
    Json::Value frameTimeout = DeviceTemplate->GetTemplate()["frame_timeout_ms"];
×
103
    if (frameTimeout.isInt()) {
×
104
        FrameTimeout = std::chrono::milliseconds(frameTimeout.asInt());
×
105
    }
106
}
107

108
void TRPCDeviceRequest::ParseSettings(const Json::Value& request,
×
109
                                      WBMQTT::TMqttRpcServer::TResultCallback onResult,
110
                                      WBMQTT::TMqttRpcServer::TErrorCallback onError)
111
{
112
    SerialPortSettings = ParseRPCSerialPortSettings(request);
×
113
    WBMQTT::JSON::Get(request, "response_timeout", ResponseTimeout);
×
114
    WBMQTT::JSON::Get(request, "frame_timeout", FrameTimeout);
×
115
    WBMQTT::JSON::Get(request, "total_timeout", TotalTimeout);
×
116
    OnResult = onResult;
×
117
    OnError = onError;
×
118
}
119

120
#ifndef __EMSCRIPTEN__
UNCOV
121
TRPCDeviceHandler::TRPCDeviceHandler(const std::string& requestDeviceLoadConfigSchemaFilePath,
×
122
                                     const std::string& requestDeviceLoadSchemaFilePath,
123
                                     const std::string& requestDeviceSetSchemaFilePath,
124
                                     const std::string& requestDeviceProbeSchemaFilePath,
125
                                     const std::string& requestDeviceSetPollSchemaFilePath,
126
                                     const TSerialDeviceFactory& deviceFactory,
127
                                     PTemplateMap templates,
128
                                     TSerialClientTaskRunner& serialClientTaskRunner,
129
                                     TRPCDeviceParametersCache& parametersCache,
130
                                     WBMQTT::PMqttRpcServer rpcServer)
×
131
    : DeviceFactory(deviceFactory),
132
      RequestDeviceLoadConfigSchema(LoadRPCRequestSchema(requestDeviceLoadConfigSchemaFilePath, "device/LoadConfig")),
×
133
      RequestDeviceLoadSchema(LoadRPCRequestSchema(requestDeviceLoadSchemaFilePath, "device/Load")),
×
134
      RequestDeviceSetSchema(LoadRPCRequestSchema(requestDeviceSetSchemaFilePath, "device/Set")),
×
135
      RequestDeviceProbeSchema(LoadRPCRequestSchema(requestDeviceProbeSchemaFilePath, "device/Probe")),
×
136
      RequestDeviceSetPollSchema(LoadRPCRequestSchema(requestDeviceSetPollSchemaFilePath, "device/SetPoll")),
×
137
      Templates(templates),
138
      SerialClientTaskRunner(serialClientTaskRunner),
139
      ParametersCache(parametersCache)
×
140
{
141
    rpcServer->RegisterAsyncMethod("device",
×
142
                                   "LoadConfig",
143
                                   std::bind(&TRPCDeviceHandler::LoadConfig,
×
144
                                             this,
×
145
                                             std::placeholders::_1,
146
                                             std::placeholders::_2,
147
                                             std::placeholders::_3));
×
148
    rpcServer->RegisterAsyncMethod("device",
×
149
                                   "Load",
150
                                   std::bind(&TRPCDeviceHandler::Load, //
×
151
                                             this,
×
152
                                             std::placeholders::_1,
153
                                             std::placeholders::_2,
154
                                             std::placeholders::_3));
×
155
    rpcServer->RegisterAsyncMethod("device",
×
156
                                   "Set",
157
                                   std::bind(&TRPCDeviceHandler::Set, //
×
158
                                             this,
×
159
                                             std::placeholders::_1,
160
                                             std::placeholders::_2,
161
                                             std::placeholders::_3));
×
162
    rpcServer->RegisterAsyncMethod("device",
×
163
                                   "Probe",
164
                                   std::bind(&TRPCDeviceHandler::Probe,
×
165
                                             this,
×
166
                                             std::placeholders::_1,
167
                                             std::placeholders::_2,
168
                                             std::placeholders::_3));
×
169

170
    rpcServer->RegisterMethod("device", "SetPoll", std::bind(&TRPCDeviceHandler::SetPoll, this, std::placeholders::_1));
×
171
}
172

173
void TRPCDeviceHandler::LoadConfig(const Json::Value& request,
×
174
                                   WBMQTT::TMqttRpcServer::TResultCallback onResult,
175
                                   WBMQTT::TMqttRpcServer::TErrorCallback onError)
176
{
177
    ValidateRPCRequest(request, RequestDeviceLoadConfigSchema);
×
178
    try {
179
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
180
        auto rpcRequest = ParseRPCDeviceLoadConfigRequest(request,
181
                                                          helper.ProtocolParams,
182
                                                          helper.Device,
183
                                                          helper.DeviceTemplate,
184
                                                          helper.DeviceFromConfig,
185
                                                          ParametersCache,
186
                                                          onResult,
187
                                                          onError);
×
188
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceLoadConfigSerialClientTask>(rpcRequest));
×
189
    } catch (const TRPCException& e) {
×
190
        ProcessException(e, onError);
×
191
    }
192
}
193

194
void TRPCDeviceHandler::Load(const Json::Value& request,
×
195
                             WBMQTT::TMqttRpcServer::TResultCallback onResult,
196
                             WBMQTT::TMqttRpcServer::TErrorCallback onError)
197
{
198
    ValidateRPCRequest(request, RequestDeviceLoadSchema);
×
199
    try {
200
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
201
        auto rpcRequest = ParseRPCDeviceLoadRequest(request,
202
                                                    helper.ProtocolParams,
203
                                                    helper.Device,
204
                                                    helper.DeviceTemplate,
205
                                                    helper.DeviceFromConfig,
206
                                                    onResult,
207
                                                    onError);
×
208
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceLoadSerialClientTask>(rpcRequest));
×
209
    } catch (const TRPCException& e) {
×
210
        ProcessException(e, onError);
×
211
    }
212
}
213

214
void TRPCDeviceHandler::Set(const Json::Value& request,
×
215
                            WBMQTT::TMqttRpcServer::TResultCallback onResult,
216
                            WBMQTT::TMqttRpcServer::TErrorCallback onError)
217
{
218
    ValidateRPCRequest(request, RequestDeviceSetSchema);
×
219
    try {
220
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
221
        auto rpcRequest = ParseRPCDeviceSetRequest(request,
222
                                                   helper.ProtocolParams,
223
                                                   helper.Device,
224
                                                   helper.DeviceTemplate,
225
                                                   helper.DeviceFromConfig,
226
                                                   onResult,
227
                                                   onError);
×
228
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceSetSerialClientTask>(rpcRequest));
×
229
    } catch (const TRPCException& e) {
×
230
        ProcessException(e, onError);
×
231
    }
232
}
233

234
void TRPCDeviceHandler::Probe(const Json::Value& request,
×
235
                              WBMQTT::TMqttRpcServer::TResultCallback onResult,
236
                              WBMQTT::TMqttRpcServer::TErrorCallback onError)
237
{
238
    ValidateRPCRequest(request, RequestDeviceProbeSchema);
×
239
    try {
240
        SerialClientTaskRunner.RunTask(request,
×
241
                                       std::make_shared<TRPCDeviceProbeSerialClientTask>(request, onResult, onError));
×
242
    } catch (const TRPCException& e) {
×
243
        ProcessException(e, onError);
×
244
    }
245
}
246

247
Json::Value TRPCDeviceHandler::SetPoll(const Json::Value& request)
×
248
{
249
    ValidateRPCRequest(request, RequestDeviceSetPollSchema);
×
250
    auto params = SerialClientTaskRunner.GetSerialClientParams(request);
×
251
    if (!params.SerialClient || !params.Device) {
×
252
        throw TRPCException("Port or device not found", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
253
    }
254
    try {
255
        if (!request["poll"].asBool()) {
×
256
            params.SerialClient->SuspendPoll(params.Device, std::chrono::steady_clock::now());
×
257
        } else {
258
            params.SerialClient->ResumePoll(params.Device);
×
259
        }
260
    } catch (const std::runtime_error& e) {
×
261
        LOG(Warn) << e.what();
×
262
        throw TRPCException(e.what(), TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
263
    }
264
    return Json::Value(Json::objectValue);
×
265
}
266
#endif
267

268
TRPCRegisterList CreateRegisterList(const TDeviceProtocolParams& protocolParams,
4✔
269
                                    const PSerialDevice& device,
270
                                    const Json::Value& templateItems,
271
                                    const Json::Value& knownItems,
272
                                    const std::string& fwVersion)
273
{
274
    TRPCRegisterList registerList;
4✔
275
    for (auto it = templateItems.begin(); it != templateItems.end(); ++it) {
30✔
276
        const auto& item = *it;
26✔
277
        auto id = templateItems.isObject() ? it.key().asString() : item["id"].asString();
26✔
278
        bool duplicate = false;
26✔
279
        for (const auto& item: registerList) {
67✔
280
            if (item.first == id) {
42✔
281
                duplicate = true;
1✔
282
                break;
1✔
283
            }
284
        }
285
        if (duplicate || item["address"].isNull() || item["readonly"].asBool() || !knownItems[id].isNull()) {
26✔
286
            continue;
5✔
287
        }
288
        if (!fwVersion.empty()) {
21✔
289
            std::string fw = item["fw"].asString();
12✔
290
            if (!fw.empty() && util::CompareVersionStrings(fw, fwVersion) > 0) {
12✔
291
                continue;
4✔
292
            }
293
        }
294
        auto config = LoadRegisterConfig(item,
295
                                         *protocolParams.protocol->GetRegTypes(),
17✔
296
                                         std::string(),
34✔
297
                                         *protocolParams.factory,
17✔
298
                                         protocolParams.factory->GetRegisterAddressFactory().GetBaseRegisterAddress(),
17✔
299
                                         0);
34✔
300
        auto reg = std::make_shared<TRegister>(device, config.RegisterConfig);
17✔
301
        reg->SetAvailable(TRegisterAvailability::AVAILABLE);
17✔
302
        registerList.push_back(std::make_pair(id, reg));
17✔
303
    }
304
    return registerList;
4✔
305
}
306

307
void ReadRegisterList(TPort& port,
×
308
                      PSerialDevice device,
309
                      TRPCRegisterList& registerList,
310
                      Json::Value& result,
311
                      int maxRetries)
312
{
313
    if (registerList.size() == 0) {
×
314
        return;
×
315
    }
316
    TRegisterComparePredicate compare;
317
    std::sort(registerList.begin(),
×
318
              registerList.end(),
319
              [compare](std::pair<std::string, PRegister>& a, std::pair<std::string, PRegister>& b) {
×
320
                  return compare(b.second, a.second);
×
321
              });
322

323
    std::string error;
×
324
    for (int i = 0; i <= maxRetries; i++) {
×
325
        try {
326
            device->Prepare(port, TDevicePrepareMode::WITHOUT_SETUP);
×
327
            break;
×
328
        } catch (const TSerialDeviceException& e) {
×
329
            if (i == maxRetries) {
×
330
                error = std::string("Failed to prepare session: ") + e.what();
×
331
                LOG(Warn) << port.GetDescription() << " " << device->ToString() << ": " << error;
×
332
                throw TRPCException(error, TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
333
            }
334
        }
335
    }
336

337
    size_t index = 0;
×
338
    while (index < registerList.size() && error.empty()) {
×
339
        auto first = registerList[index].second;
×
340
        auto range = device->CreateRegisterRange();
×
341
        while (index < registerList.size() &&
×
342
               range->Add(port, registerList[index].second, std::chrono::milliseconds::max()))
×
343
        {
344
            ++index;
×
345
        }
346
        for (int i = 0; i <= maxRetries; ++i) {
×
347
            try {
348
                device->ReadRegisterRange(port, range, true);
×
349
                break;
×
350
            } catch (const TSerialDeviceException& e) {
×
351
                if (i == maxRetries) {
×
352
                    error = "Failed to read " + std::to_string(range->RegisterList().size()) +
×
353
                            " registers starting from <" + first->GetConfig()->ToString() + ">: " + e.what();
×
354
                }
355
            }
356
        }
357
    }
358

359
    try {
360
        device->EndSession(port);
×
361
    } catch (const TSerialDeviceException& e) {
×
362
        LOG(Warn) << port.GetDescription() << " " << device->ToString() << " unable to end session: " << e.what();
×
363
    }
364

365
    if (!error.empty()) {
×
366
        LOG(Warn) << port.GetDescription() << " " << device->ToString() << ": " << error;
×
367
        throw TRPCException(error, TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
368
    }
369

370
    for (size_t i = 0; i < registerList.size(); ++i) {
×
371
        auto& reg = registerList[i];
×
372
        result[reg.first] = RawValueToJSON(*reg.second->GetConfig(), reg.second->GetValue());
×
373
    }
374
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc