• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wirenboard / wb-mqtt-serial / 689

26 Aug 2025 01:28PM UTC coverage: 73.031% (-0.08%) from 73.114%
689

push

github

web-flow
Add protocol parameter to device/Probe RPC (#988)

  * Add protocol to Modbus TCP port information in ports/Load RPC
  * Add protocol parameter to device/Probe RPC
  * Set TCP_NODELAY for TCP ports
  * Handle TCP port closing by remote

6605 of 9406 branches covered (70.22%)

28 of 59 new or added lines in 11 files covered. (47.46%)

68 existing lines in 3 files now uncovered.

12554 of 17190 relevant lines covered (73.03%)

372.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.3
/src/rpc/rpc_device_handler.cpp
1
#include "rpc_device_handler.h"
2
#include "rpc_device_load_config_task.h"
3
#include "rpc_device_load_task.h"
4
#include "rpc_device_probe_task.h"
5
#include "rpc_device_set_task.h"
6
#include "rpc_helpers.h"
7

8
#define LOG(logger) ::logger.Log() << "[RPC] "
9

10
void TRPCDeviceParametersCache::RegisterCallbacks(PHandlerConfig handlerConfig)
×
11
{
12
    for (const auto& portConfig: handlerConfig->PortConfigs) {
×
13
        for (const auto& device: portConfig->Devices) {
×
14
            std::string id = GetId(*portConfig->Port, device->Device->DeviceConfig()->SlaveId);
×
15
            device->Device->AddOnConnectionStateChangedCallback([this, id](PSerialDevice device) {
×
16
                if (device->GetConnectionState() == TDeviceConnectionState::DISCONNECTED) {
×
17
                    Remove(id);
×
18
                }
19
            });
×
20
        }
21
    }
22
}
23

24
std::string TRPCDeviceParametersCache::GetId(const TPort& port, const std::string& slaveId) const
×
25
{
26
    return port.GetDescription(false) + ":" + slaveId;
×
27
}
28

29
void TRPCDeviceParametersCache::Add(const std::string& id, const Json::Value& value)
×
30
{
31
    std::unique_lock lock(Mutex);
×
32
    DeviceParameters[id] = value;
×
33
}
34

35
void TRPCDeviceParametersCache::Remove(const std::string& id)
×
36
{
37
    std::unique_lock lock(Mutex);
×
38
    DeviceParameters.erase(id);
×
39
}
40

41
bool TRPCDeviceParametersCache::Contains(const std::string& id) const
×
42
{
43
    std::unique_lock lock(Mutex);
×
44
    return DeviceParameters.find(id) != DeviceParameters.end();
×
45
}
46

47
const Json::Value& TRPCDeviceParametersCache::Get(const std::string& id, const Json::Value& defaultValue) const
×
48
{
49
    std::unique_lock lock(Mutex);
×
50
    auto it = DeviceParameters.find(id);
×
51
    return it != DeviceParameters.end() ? it->second : defaultValue;
×
52
};
53

54
TRPCDeviceHelper::TRPCDeviceHelper(const Json::Value& request,
×
55
                                   const TSerialDeviceFactory& deviceFactory,
56
                                   PTemplateMap templates,
57
                                   TSerialClientTaskRunner& serialClientTaskRunner)
×
58
{
59
    auto params = serialClientTaskRunner.GetSerialClientParams(request);
×
60
    if (params.Device == nullptr) {
×
61
        DeviceTemplate = templates->GetTemplate(request["device_type"].asString());
×
UNCOV
62
        auto protocolName = DeviceTemplate->GetProtocol();
×
63
        if (protocolName == "modbus" && request["modbus_mode"].asString() == "TCP") {
×
64
            protocolName += "-tcp";
×
65
        }
66
        ProtocolParams = deviceFactory.GetProtocolParams(protocolName);
×
67
        auto config = std::make_shared<TDeviceConfig>("RPC Device", request["slave_id"].asString(), protocolName);
×
68
        if (ProtocolParams.protocol->IsModbus()) {
×
UNCOV
69
            config->MaxRegHole = Modbus::MAX_HOLE_CONTINUOUS_16_BIT_REGISTERS;
×
70
            config->MaxBitHole = Modbus::MAX_HOLE_CONTINUOUS_1_BIT_REGISTERS;
×
71
            config->MaxReadRegisters = Modbus::MAX_READ_REGISTERS;
×
72
        }
73
        Device = ProtocolParams.factory->CreateDevice(DeviceTemplate->GetTemplate(), config, ProtocolParams.protocol);
×
74
    } else {
75
        Device = params.Device;
×
76
        DeviceTemplate = templates->GetTemplate(Device->DeviceConfig()->DeviceType);
×
UNCOV
77
        ProtocolParams = deviceFactory.GetProtocolParams(DeviceTemplate->GetProtocol());
×
78
        DeviceFromConfig = true;
×
79
    }
80
    if (DeviceTemplate->WithSubdevices()) {
×
UNCOV
81
        throw TRPCException("Device \"" + DeviceTemplate->Type + "\" is not supported by this RPC",
×
UNCOV
82
                            TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
83
    }
84
}
85

UNCOV
86
TRPCDeviceRequest::TRPCDeviceRequest(const TDeviceProtocolParams& protocolParams,
×
87
                                     PSerialDevice device,
88
                                     PDeviceTemplate deviceTemplate,
UNCOV
89
                                     bool deviceFromConfig)
×
90
    : ProtocolParams(protocolParams),
91
      Device(device),
92
      DeviceTemplate(deviceTemplate),
93
      DeviceFromConfig(deviceFromConfig)
×
94
{
95
    Json::Value responseTimeout = DeviceTemplate->GetTemplate()["response_timeout_ms"];
×
UNCOV
96
    if (responseTimeout.isInt()) {
×
UNCOV
97
        ResponseTimeout = std::chrono::milliseconds(responseTimeout.asInt());
×
98
    }
99

100
    Json::Value frameTimeout = DeviceTemplate->GetTemplate()["frame_timeout_ms"];
×
UNCOV
101
    if (frameTimeout.isInt()) {
×
UNCOV
102
        FrameTimeout = std::chrono::milliseconds(frameTimeout.asInt());
×
103
    }
104
}
105

UNCOV
106
void TRPCDeviceRequest::ParseSettings(const Json::Value& request,
×
107
                                      WBMQTT::TMqttRpcServer::TResultCallback onResult,
108
                                      WBMQTT::TMqttRpcServer::TErrorCallback onError)
109
{
110
    SerialPortSettings = ParseRPCSerialPortSettings(request);
×
111
    WBMQTT::JSON::Get(request, "response_timeout", ResponseTimeout);
×
112
    WBMQTT::JSON::Get(request, "frame_timeout", FrameTimeout);
×
113
    WBMQTT::JSON::Get(request, "total_timeout", TotalTimeout);
×
UNCOV
114
    OnResult = onResult;
×
UNCOV
115
    OnError = onError;
×
116
}
117

UNCOV
118
TRPCDeviceHandler::TRPCDeviceHandler(const std::string& requestDeviceLoadConfigSchemaFilePath,
×
119
                                     const std::string& requestDeviceLoadSchemaFilePath,
120
                                     const std::string& requestDeviceSetSchemaFilePath,
121
                                     const std::string& requestDeviceProbeSchemaFilePath,
122
                                     const std::string& requestDeviceSetPollSchemaFilePath,
123
                                     const TSerialDeviceFactory& deviceFactory,
124
                                     PTemplateMap templates,
125
                                     TSerialClientTaskRunner& serialClientTaskRunner,
126
                                     TRPCDeviceParametersCache& parametersCache,
127
                                     WBMQTT::PMqttRpcServer rpcServer)
×
128
    : DeviceFactory(deviceFactory),
129
      RequestDeviceLoadConfigSchema(LoadRPCRequestSchema(requestDeviceLoadConfigSchemaFilePath, "device/LoadConfig")),
×
130
      RequestDeviceLoadSchema(LoadRPCRequestSchema(requestDeviceLoadSchemaFilePath, "device/Load")),
×
131
      RequestDeviceSetSchema(LoadRPCRequestSchema(requestDeviceSetSchemaFilePath, "device/Set")),
×
UNCOV
132
      RequestDeviceProbeSchema(LoadRPCRequestSchema(requestDeviceProbeSchemaFilePath, "device/Probe")),
×
UNCOV
133
      RequestDeviceSetPollSchema(LoadRPCRequestSchema(requestDeviceSetPollSchemaFilePath, "device/SetPoll")),
×
134
      Templates(templates),
135
      SerialClientTaskRunner(serialClientTaskRunner),
136
      ParametersCache(parametersCache)
×
137
{
138
    rpcServer->RegisterAsyncMethod("device",
×
139
                                   "LoadConfig",
UNCOV
140
                                   std::bind(&TRPCDeviceHandler::LoadConfig,
×
UNCOV
141
                                             this,
×
142
                                             std::placeholders::_1,
143
                                             std::placeholders::_2,
UNCOV
144
                                             std::placeholders::_3));
×
145
    rpcServer->RegisterAsyncMethod("device",
×
146
                                   "Load",
UNCOV
147
                                   std::bind(&TRPCDeviceHandler::Load, //
×
UNCOV
148
                                             this,
×
149
                                             std::placeholders::_1,
150
                                             std::placeholders::_2,
UNCOV
151
                                             std::placeholders::_3));
×
152
    rpcServer->RegisterAsyncMethod("device",
×
153
                                   "Set",
UNCOV
154
                                   std::bind(&TRPCDeviceHandler::Set, //
×
UNCOV
155
                                             this,
×
156
                                             std::placeholders::_1,
157
                                             std::placeholders::_2,
UNCOV
158
                                             std::placeholders::_3));
×
159
    rpcServer->RegisterAsyncMethod("device",
×
160
                                   "Probe",
UNCOV
161
                                   std::bind(&TRPCDeviceHandler::Probe,
×
UNCOV
162
                                             this,
×
163
                                             std::placeholders::_1,
164
                                             std::placeholders::_2,
165
                                             std::placeholders::_3));
×
166

UNCOV
167
    rpcServer->RegisterMethod("device", "SetPoll", std::bind(&TRPCDeviceHandler::SetPoll, this, std::placeholders::_1));
×
168
}
169

UNCOV
170
void TRPCDeviceHandler::LoadConfig(const Json::Value& request,
×
171
                                   WBMQTT::TMqttRpcServer::TResultCallback onResult,
172
                                   WBMQTT::TMqttRpcServer::TErrorCallback onError)
173
{
174
    ValidateRPCRequest(request, RequestDeviceLoadConfigSchema);
×
175
    try {
UNCOV
176
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
177
        auto rpcRequest = ParseRPCDeviceLoadConfigRequest(request,
178
                                                          helper.ProtocolParams,
179
                                                          helper.Device,
180
                                                          helper.DeviceTemplate,
181
                                                          helper.DeviceFromConfig,
182
                                                          ParametersCache,
183
                                                          onResult,
184
                                                          onError);
×
185
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceLoadConfigSerialClientTask>(rpcRequest));
×
UNCOV
186
    } catch (const TRPCException& e) {
×
UNCOV
187
        ProcessException(e, onError);
×
188
    }
189
}
190

UNCOV
191
void TRPCDeviceHandler::Load(const Json::Value& request,
×
192
                             WBMQTT::TMqttRpcServer::TResultCallback onResult,
193
                             WBMQTT::TMqttRpcServer::TErrorCallback onError)
194
{
195
    ValidateRPCRequest(request, RequestDeviceLoadSchema);
×
196
    try {
UNCOV
197
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
198
        auto rpcRequest = ParseRPCDeviceLoadRequest(request,
199
                                                    helper.ProtocolParams,
200
                                                    helper.Device,
201
                                                    helper.DeviceTemplate,
202
                                                    helper.DeviceFromConfig,
203
                                                    onResult,
204
                                                    onError);
×
205
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceLoadSerialClientTask>(rpcRequest));
×
UNCOV
206
    } catch (const TRPCException& e) {
×
UNCOV
207
        ProcessException(e, onError);
×
208
    }
209
}
210

UNCOV
211
void TRPCDeviceHandler::Set(const Json::Value& request,
×
212
                            WBMQTT::TMqttRpcServer::TResultCallback onResult,
213
                            WBMQTT::TMqttRpcServer::TErrorCallback onError)
214
{
215
    ValidateRPCRequest(request, RequestDeviceSetSchema);
×
216
    try {
UNCOV
217
        auto helper = TRPCDeviceHelper(request, DeviceFactory, Templates, SerialClientTaskRunner);
×
218
        auto rpcRequest = ParseRPCDeviceSetRequest(request,
219
                                                   helper.ProtocolParams,
220
                                                   helper.Device,
221
                                                   helper.DeviceTemplate,
222
                                                   helper.DeviceFromConfig,
223
                                                   onResult,
224
                                                   onError);
×
225
        SerialClientTaskRunner.RunTask(request, std::make_shared<TRPCDeviceSetSerialClientTask>(rpcRequest));
×
UNCOV
226
    } catch (const TRPCException& e) {
×
UNCOV
227
        ProcessException(e, onError);
×
228
    }
229
}
230

UNCOV
231
void TRPCDeviceHandler::Probe(const Json::Value& request,
×
232
                              WBMQTT::TMqttRpcServer::TResultCallback onResult,
233
                              WBMQTT::TMqttRpcServer::TErrorCallback onError)
234
{
235
    ValidateRPCRequest(request, RequestDeviceProbeSchema);
×
236
    try {
237
        SerialClientTaskRunner.RunTask(request,
×
238
                                       std::make_shared<TRPCDeviceProbeSerialClientTask>(request, onResult, onError));
×
UNCOV
239
    } catch (const TRPCException& e) {
×
UNCOV
240
        ProcessException(e, onError);
×
241
    }
242
}
243

244
Json::Value TRPCDeviceHandler::SetPoll(const Json::Value& request)
×
245
{
246
    ValidateRPCRequest(request, RequestDeviceSetPollSchema);
×
247
    auto params = SerialClientTaskRunner.GetSerialClientParams(request);
×
UNCOV
248
    if (!params.SerialClient || !params.Device) {
×
UNCOV
249
        throw TRPCException("Port or device not found", TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
250
    }
251
    try {
UNCOV
252
        if (!request["poll"].asBool()) {
×
253
            params.SerialClient->SuspendPoll(params.Device, std::chrono::steady_clock::now());
×
254
        } else {
255
            params.SerialClient->ResumePoll(params.Device);
×
256
        }
257
    } catch (const std::runtime_error& e) {
×
UNCOV
258
        LOG(Warn) << e.what();
×
259
        throw TRPCException(e.what(), TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
260
    }
UNCOV
261
    return Json::Value(Json::objectValue);
×
262
}
263

264
TRPCRegisterList CreateRegisterList(const TDeviceProtocolParams& protocolParams,
4✔
265
                                    const PSerialDevice& device,
266
                                    const Json::Value& templateItems,
267
                                    const Json::Value& knownItems,
268
                                    const std::string& fwVersion)
269
{
270
    TRPCRegisterList registerList;
4✔
271
    for (auto it = templateItems.begin(); it != templateItems.end(); ++it) {
30✔
272
        const auto& item = *it;
26✔
273
        auto id = templateItems.isObject() ? it.key().asString() : item["id"].asString();
26✔
274
        bool duplicate = false;
26✔
275
        for (const auto& item: registerList) {
67✔
276
            if (item.first == id) {
42✔
277
                duplicate = true;
1✔
278
                break;
1✔
279
            }
280
        }
281
        if (duplicate || item["address"].isNull() || item["readonly"].asBool() || !knownItems[id].isNull()) {
26✔
282
            continue;
5✔
283
        }
284
        if (!fwVersion.empty()) {
21✔
285
            std::string fw = item["fw"].asString();
12✔
286
            if (!fw.empty() && util::CompareVersionStrings(fw, fwVersion) > 0) {
12✔
287
                continue;
4✔
288
            }
289
        }
290
        auto config = LoadRegisterConfig(item,
291
                                         *protocolParams.protocol->GetRegTypes(),
17✔
292
                                         std::string(),
34✔
293
                                         *protocolParams.factory,
17✔
294
                                         protocolParams.factory->GetRegisterAddressFactory().GetBaseRegisterAddress(),
17✔
295
                                         0);
34✔
296
        auto reg = std::make_shared<TRegister>(device, config.RegisterConfig);
17✔
297
        reg->SetAvailable(TRegisterAvailability::AVAILABLE);
17✔
298
        registerList.push_back(std::make_pair(id, reg));
17✔
299
    }
300
    return registerList;
4✔
301
}
302

UNCOV
303
void ReadRegisterList(TPort& port,
×
304
                      PSerialDevice device,
305
                      TRPCRegisterList& registerList,
306
                      Json::Value& result,
307
                      int maxRetries)
308
{
UNCOV
309
    if (registerList.size() == 0) {
×
UNCOV
310
        return;
×
311
    }
312
    TRegisterComparePredicate compare;
313
    std::sort(registerList.begin(),
×
314
              registerList.end(),
UNCOV
315
              [compare](std::pair<std::string, PRegister>& a, std::pair<std::string, PRegister>& b) {
×
UNCOV
316
                  return compare(b.second, a.second);
×
317
              });
318

UNCOV
319
    std::string error;
×
320
    for (int i = 0; i <= maxRetries; i++) {
×
321
        try {
322
            device->Prepare(port, TDevicePrepareMode::WITHOUT_SETUP);
×
323
            break;
×
324
        } catch (const TSerialDeviceException& e) {
×
325
            if (i == maxRetries) {
×
326
                error = std::string("Failed to prepare session: ") + e.what();
×
UNCOV
327
                LOG(Warn) << port.GetDescription() << " " << device->ToString() << ": " << error;
×
UNCOV
328
                throw TRPCException(error, TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
329
            }
330
        }
331
    }
332

333
    size_t index = 0;
×
334
    while (index < registerList.size() && error.empty()) {
×
335
        auto first = registerList[index].second;
×
336
        auto range = device->CreateRegisterRange();
×
UNCOV
337
        while (index < registerList.size() &&
×
338
               range->Add(port, registerList[index].second, std::chrono::milliseconds::max()))
×
339
        {
340
            ++index;
×
341
        }
342
        for (int i = 0; i <= maxRetries; ++i) {
×
343
            try {
344
                device->ReadRegisterRange(port, range, true);
×
345
                break;
×
346
            } catch (const TSerialDeviceException& e) {
×
347
                if (i == maxRetries) {
×
UNCOV
348
                    error = "Failed to read " + std::to_string(range->RegisterList().size()) +
×
UNCOV
349
                            " registers starting from <" + first->GetConfig()->ToString() + ">: " + e.what();
×
350
                }
351
            }
352
        }
353
    }
354

355
    try {
356
        device->EndSession(port);
×
UNCOV
357
    } catch (const TSerialDeviceException& e) {
×
UNCOV
358
        LOG(Warn) << port.GetDescription() << " " << device->ToString() << " unable to end session: " << e.what();
×
359
    }
360

361
    if (!error.empty()) {
×
UNCOV
362
        LOG(Warn) << port.GetDescription() << " " << device->ToString() << ": " << error;
×
UNCOV
363
        throw TRPCException(error, TRPCResultCode::RPC_WRONG_PARAM_VALUE);
×
364
    }
365

366
    for (size_t i = 0; i < registerList.size(); ++i) {
×
UNCOV
367
        auto& reg = registerList[i];
×
UNCOV
368
        result[reg.first] = RawValueToJSON(*reg.second->GetConfig(), reg.second->GetValue());
×
369
    }
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc