• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

intel / mfd-network-adapter / 18744722160

23 Oct 2025 10:00AM UTC coverage: 92.781% (-0.08%) from 92.86%
18744722160

Pull #27

github

web-flow
Merge e5659dd1c into ccf36e51c
Pull Request #27: feat: Add devlink commands and RSS queues settings

60 of 70 new or added lines in 3 files covered. (85.71%)

38 existing lines in 4 files now uncovered.

8496 of 9157 relevant lines covered (92.78%)

3.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.13
/mfd_network_adapter/network_interface/linux.py
1
# Copyright (C) 2025 Intel Corporation
2
# SPDX-License-Identifier: MIT
3
"""Module for Network Interface for Linux."""
4✔
4

5
import logging
4✔
6
import re
4✔
7
from dataclasses import fields
4✔
8
from typing import Dict, Optional, TYPE_CHECKING, Union
4✔
9

10
from mfd_common_libs import add_logging_level, log_levels
4✔
11
from mfd_connect.base import ConnectionCompletedProcess
4✔
12
from mfd_connect.exceptions import ConnectionCalledProcessError
4✔
13
from mfd_kernel_namespace import add_namespace_call_command
4✔
14
from mfd_typing import MACAddress
4✔
15
from mfd_typing.driver_info import DriverInfo
4✔
16
from mfd_typing.network_interface import LinuxInterfaceInfo, VsiInfo
4✔
17

18
from mfd_network_adapter import NetworkAdapterOwner
4✔
19
from .base import NetworkInterface
4✔
20
from .data_structures import RingBufferSettings, RingBuffer
4✔
21
from .exceptions import (
4✔
22
    BrandingStringException,
23
    DeviceStringException,
24
    NetworkQueuesException,
25
    RDMADeviceNotFound,
26
    NumaNodeException,
27
    RingBufferException,
28
    RingBufferSettingException,
29
    FirmwareVersionNotFound,
30
    DeviceSetupException,
31
)
32
from ..api.basic.linux import get_mac_address
4✔
33
from ..exceptions import NetworkAdapterConfigurationException
4✔
34

35
if TYPE_CHECKING:
36
    from mfd_connect import Connection
37
    from mfd_model.config import NetworkInterfaceModelBase
38
    from mfd_libibverbs_utils import IBVDevices
39

40
logger = logging.getLogger(__name__)
4✔
41
add_logging_level(level_name="MODULE_DEBUG", level_value=log_levels.MODULE_DEBUG)
4✔
42

43

44
class LinuxNetworkInterface(NetworkInterface):
4✔
45
    """Class to handle Network Interface in Linux."""
46

47
    _ibv_devices: "IBVDevices" = None
4✔
48

49
    def __init__(
4✔
50
        self,
51
        owner: "NetworkAdapterOwner" = None,
52
        interface_info: LinuxInterfaceInfo = None,  # None should be removed with the owner
53
        topology: "NetworkInterfaceModelBase | None" = None,
54
        *,  # should be moved as a first arg with owner removal
55
        connection: "Connection" = None,
56
        **kwargs,
57
    ) -> None:
58
        """
59
        Linux Network Interface Constructor.
60

61
        :param owner: NetworkAdapterOwner object
62
        :param interface_info: InterfaceInfo object
63
        :param topology: NetworkInterfaceModelBase object
64
        :param connection: Connection object
65
        """
66
        super().__init__(connection=connection, owner=owner, interface_info=interface_info, topology=topology, **kwargs)
4✔
67

68
    @property
4✔
69
    def namespace(self) -> Union[str, None]:
4✔
70
        """Get namespace."""
71
        return self._interface_info.namespace
4✔
72

73
    @property
4✔
74
    def vsi_info(self) -> Union[VsiInfo, None]:
4✔
75
        """Get VSI Info."""
UNCOV
76
        return self._interface_info.vsi_info
×
77

78
    def get_branding_string(self) -> str:
4✔
79
        """
80
        Get branding string.
81

82
        :return: Branding string
83
        :raises BrandingStringException: if branding string not found
84
        """
85
        command = add_namespace_call_command(f"lspci -s {self.pci_address.lspci} -v", self.namespace)
4✔
86
        subsystem_pattern = r"\s*Subsystem: (?P<branding_string>.+)"
4✔
87
        lspci_output = self._connection.execute_command(command).stdout
4✔
88
        match = re.search(subsystem_pattern, lspci_output)
4✔
89
        if not match:
4✔
90
            raise BrandingStringException(
4✔
91
                f"No matching branding string found for pci address: {self.pci_address.lspci}"
92
            )
93
        return match.group("branding_string").rstrip()
4✔
94

95
    def get_device_string(self) -> str:
4✔
96
        """
97
        Get device string.
98

99
        :return: Device string
100
        :raises DeviceStringException: if device string not found
101
        """
102
        command = add_namespace_call_command(f"lspci -s {self.pci_address.lspci} -v", self.namespace)
4✔
103
        device_string_pattern = r"\d+:\d+.\d\s*(?:Ethernet controller|Class \d+): (?P<device_string>.+)"
4✔
104
        lspci_output = self._connection.execute_command(command).stdout
4✔
105
        match = re.search(device_string_pattern, lspci_output)
4✔
106
        if not match:
4✔
107
            raise DeviceStringException(f"No matching device string found for pci address: {self.pci_address.lspci}")
4✔
108
        return match.group("device_string").rstrip()
4✔
109

110
    def get_mac_address(self) -> MACAddress:
4✔
111
        """
112
        Get MAC Address of interface.
113

114
        :return: MACAddress
115
        """
116
        logger.warning("This API is deprecated - `interface.get_mac_address()`. Use `interface.mac.get_mac() instead.")
×
UNCOV
117
        return get_mac_address(self._connection, interface_name=self.name, namespace=self.namespace)
×
118

119
    def get_network_queues(self) -> Dict:
4✔
120
        """
121
        Get network queue values for network interface using ethtool --show-channels.
122

123
        :return: Dictionary containing queue values
124
        :raises NetworkQueuesException if command failed
125
        """
126
        ethtool_command = add_namespace_call_command(f"ethtool -l {self.name}", self.namespace)
4✔
127
        ethtool_output = self._connection.execute_command(ethtool_command).stdout
4✔
128
        queues_pattern = r"settings:"
4✔
129
        queue_types = ["RX", "TX", "Other", "Combined"]
4✔
130
        for queue_type in queue_types:
4✔
131
            queues_pattern += rf"\n{queue_type}:\s+(?P<{queue_type.lower()}>[0-9]+|n/a)"
4✔
132

133
        queues_pattern = re.compile(queues_pattern)
4✔
134

135
        queues = {}
4✔
136
        for match in queues_pattern.finditer(ethtool_output):
4✔
137
            queues = match.groupdict()
4✔
138
            for key, value in queues.items():
4✔
139
                _value = int(value) if value.isdigit() else None
4✔
140
                queues[key] = _value
4✔
141
        if not queues:
4✔
142
            raise NetworkQueuesException(f"Could not read network queues for interface {self.name}")
4✔
143
        return queues
4✔
144

145
    def set_network_queues(
4✔
146
        self,
147
        rx: Optional[int] = None,
148
        tx: Optional[int] = None,
149
        other: Optional[int] = None,
150
        combined: Optional[int] = None,
151
    ) -> None:
152
        """
153
        Set network queues for network interface using ethtool --set-channels.
154

155
        :param rx: Value to set for RX queues. If not provided no value will be set
156
        :param tx: Value to set for RX queues. If not provided no value will be set
157
        :param other: Value to set for RX queues. If not provided no value will be set
158
        :param combined: Value to set for RX queues. If not provided no value will be set
159
        :raises NetworkQueuesException if no values passed or command failed.
160
        """
161
        start_command = f"ethtool -L {self.name}"
4✔
162
        ethtool_command = start_command
4✔
163
        if rx is not None:
4✔
164
            ethtool_command += f" rx {rx}"
4✔
165
        if tx is not None:
4✔
UNCOV
166
            ethtool_command += f" tx {tx}"
×
167
        if other is not None:
4✔
UNCOV
168
            ethtool_command += f" other {other}"
×
169
        if combined is not None:
4✔
UNCOV
170
            ethtool_command += f" combined {combined}"
×
171
        if start_command == ethtool_command:
4✔
172
            raise NetworkQueuesException("No values set to queues")
4✔
173

174
        result = self._connection.execute_command(add_namespace_call_command(ethtool_command, namespace=self.namespace))
4✔
175
        rc = result.return_code
4✔
176
        if rc:
4✔
177
            raise NetworkQueuesException(
4✔
178
                f"Failed to set network queues for interface {self.name}." f"\n{result.stdout}"
179
            )
180

181
    @property
4✔
182
    def ibv_devices(self) -> "IBVDevices":
4✔
183
        """
184
        Tool IBVDevices property established with first usage.
185

186
        :return IBVDevices
187
        """
188
        if self._ibv_devices is None:
4✔
189
            from mfd_libibverbs_utils import IBVDevices
4✔
190

191
            self._ibv_devices = IBVDevices(connection=self._connection)
4✔
192
        return self._ibv_devices
4✔
193

194
    def get_rdma_device_name(self) -> str:
4✔
195
        """
196
        Get RDMA device name for network interface.
197

198
        :raises RDMADeviceNotFound: if not found device for interface.
199
        :return: Read RDMA device name.
200
        """
201
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Getting RDMA device name for {self.name}")
4✔
202
        rdma_dev_cmd = (
4✔
203
            f"ls /sys/class/net/{self.name}/device/infiniband/ 2>/dev/null",
204
            f"ls /sys/class/net/{self.name}/device/ice.roce.*/infiniband/ 2>/dev/null",
205
        )
206
        for cmd in rdma_dev_cmd:
4✔
207
            try:
4✔
208
                rdma_device = self._connection.execute_command(cmd, shell=True).stdout.strip()
4✔
209
                if rdma_device:
4✔
210
                    logger.log(
4✔
211
                        level=log_levels.MODULE_DEBUG, msg=f"Found {rdma_device} RDMA device name for {self.name}"
212
                    )
213
                    return rdma_device
4✔
214
            except ConnectionCalledProcessError:
×
UNCOV
215
                pass
×
216
        devices = self.ibv_devices.get_list()
4✔
217
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Found RDMA devices: {devices}")
4✔
218
        raise RDMADeviceNotFound(f"Failed to find RDMA device for {self.name}")
4✔
219

220
    def get_numa_node(self) -> int:
4✔
221
        """
222
        Get the Non-Uniform Memory Architecture NUMA Node of the network interface.
223

224
        :raises NumaNodeException if numa file is not preset for interface.
225
        :return (int): NUMA node of the test network interface.
226
        """
227
        try:
4✔
228
            node = self._connection.execute_command(
4✔
229
                add_namespace_call_command(f"cat /sys/class/net/{self.name}/device/numa_node", namespace=self.namespace)
230
            ).stdout
231
        except ConnectionCalledProcessError:
×
UNCOV
232
            raise NumaNodeException(f"NUMA node cannot be determined for interface: {self.name}")
×
233
        return int(node)
4✔
234

235
    def get_ring_settings(self) -> RingBufferSettings:
4✔
236
        """
237
        Get ring buffer settings.
238

239
        :return: RingBufferSettings obj with current and max settings.
240
        :raises RingBufferException when regex didn't match ethtool output.
241
        """
242
        ethtool_output = self._connection.execute_command(
4✔
243
            add_namespace_call_command(f"ethtool -g {self.name}", namespace=self.namespace)
244
        ).stdout.strip()
245
        ethtool_output_pattern = (
4✔
246
            r"pre-set maximums:(?P<maximum_settings>.*)current hardware settings:(?P<current_settings>.*)"
247
        )
248

249
        matched_settings = re.search(ethtool_output_pattern, ethtool_output, re.I | re.S | re.M)
4✔
250
        if matched_settings is None:
4✔
UNCOV
251
            raise RingBufferException(
×
252
                f"Regex expression {ethtool_output_pattern} didn't match ethtool output {ethtool_output}"
253
            )
254

255
        settings = RingBufferSettings()
4✔
256
        for settings_type_name, settings_type in settings.__dict__.items():
4✔
257
            for setting in fields(settings_type):
4✔
258
                setting_pattern = rf'{setting.name.replace("_", " ")}:\s+(?P<setting_value>\d+)'
4✔
259
                matched_setting_value = re.search(
4✔
260
                    setting_pattern, matched_settings[f"{settings_type_name}_settings"], re.I
261
                )
262
                if matched_setting_value:
4✔
263
                    setattr(settings_type, setting.name, int(matched_setting_value["setting_value"]))
4✔
264

265
        return settings
4✔
266

267
    def set_ring_settings(self, settings: RingBuffer) -> None:
4✔
268
        """
269
        Set ring buffer settings.
270

271
        :param settings: RingBufferSettings obj with values to be set.
272
        """
273
        self._connection.execute_command(
4✔
274
            add_namespace_call_command(f"ethtool -G {self.name} {settings!r}", namespace=self.namespace),
275
            custom_exception=RingBufferSettingException,
276
        )
277

278
    def get_firmware_version(self) -> str:
4✔
279
        """
280
        Get firmware version with ethtool (nvm version, eetrack_id, combo_boot_image_version).
281

282
        :return: Firmware version
283
        :raises FirmwareVersionNotFound: If firmware version was not found
284
        """
285
        command_ethtool = add_namespace_call_command(f"ethtool -i {self.name}", self.namespace)
×
UNCOV
286
        interface_info = self._connection.execute_command(command_ethtool).stdout
×
287

288
        version_match = re.search(r"firmware-version: (?P<firmware_version>.+)", interface_info, re.MULTILINE)
×
289
        if not version_match:
×
UNCOV
290
            raise FirmwareVersionNotFound(f"Can't find firmware version for [{self.name}]!")
×
291

UNCOV
292
        return version_match.group("firmware_version")
×
293

294
    def get_driver_info(self) -> DriverInfo:
4✔
295
        """
296
        Get information about driver name and version with Get-NetAdapter Powershell commandlet.
297

298
        :return: DriverInfo dataclass that contains driver_name and driver_version
299
        :raises: DriverInfoNotFound if failed.
300
        """
UNCOV
301
        return self.driver.get_driver_info()
×
302

303
    def get_number_of_ports(self) -> int:
4✔
304
        """
305
        Get number of ports in tested adapter.
306

307
        :return: Number of ports in tested adapter
308
        :raise: DeviceSetupException: when any number of ports not found
309
        """
310
        result = self._connection.execute_command(
4✔
311
            command=f"lspci | grep Eth | awk -F ':' '{{print $NF}}' | uniq -c | grep '{self.get_device_string()}'",
312
            shell=True,
313
            expected_return_codes={0},
314
        )
315

316
        regex = re.search(r"^\s*(?P<number_ports>\d+)", result.stdout)
4✔
317
        if regex:
4✔
318
            return int(regex.group("number_ports"))
4✔
319
        else:
320
            raise DeviceSetupException("Can't find number of ports in tested adapter.")
4✔
321

322
    def reload_adapter_devlink(self) -> ConnectionCompletedProcess:
4✔
323
        """
324
        Reload adapter using devlink.
325

326
        :return: ConnectionCompletedProcess
327
        """
NEW
328
        logger.log(level=log_levels.MFD_DEBUG, msg=f"Reloading adapter {self.name} using devlink")
×
NEW
329
        return self._connection.execute_command(
×
330
            f"devlink dev reload pci/{self.pci_address}", expected_return_codes={0, 1}
331
        )
332

333
    def get_msix_vectors_count(self, method: str = "devlink") -> int | None:
4✔
334
        """
335
        Get number of MSI-X vectors for the given interface.
336

337
        :param method: Method to use for setting MSI-X vectors count. Options are "devlink" or "sysfs".
338
        :return: Number of MSI-X vectors available for the interface. None if number of vectors could not be determined.
339
        """
340
        logger.log(level=log_levels.MFD_DEBUG, msg=f"Getting MSI-X vectors count for interface {self.name}")
4✔
341
        if method == "devlink":
4✔
342
            out = self._connection.execute_command(f"devlink resource show pci/{self.pci_address}").stdout
4✔
343
            match = re.search(r"name msix_vf size (\d+) ", out)
4✔
344
            if match:
4✔
345
                logger.log(
4✔
346
                    level=log_levels.MFD_INFO, msg=f"MSI-X vectors count for interface {self.name}: {match.group(1)}"
347
                )
348
                return int(match.group(1))
4✔
349
            else:
350
                logger.error(f"Could not find MSI-X vectors count for interface {self.name}")
4✔
351
                return None
4✔
352
        if method == "sysfs":
4✔
353
            out = self._connection.execute_command(
4✔
354
                f"cat /sys/bus/pci/devices/{self.pci_address}/sriov_vf_msix_count"
355
            ).stdout
356
            if out:
4✔
357
                logger.log(level=log_levels.MFD_INFO, msg=f"MSI-X vectors count for interface {self.name}: {out}")
4✔
358
                return int(out)
4✔
359
            else:
360
                logger.error(f"Could not find MSI-X vectors count for interface {self.name}")
4✔
361
                return None
4✔
362

363
        raise ValueError(f"Unknown method {method} for getting MSI-X vectors count")
4✔
364

365
    def set_msix_vectors_count(self, count: int, method: str = "devlink") -> None:
4✔
366
        """
367
        Set number of MSI-X vectors for the given interface.
368

369
        :param count: Number of MSI-X vectors to set
370
        :param method: Method to use for setting MSI-X vectors count. Options are "devlink" or "sysfs".
371
        :return: None
372
        """
373
        logger.log(level=log_levels.MFD_DEBUG, msg=f"Setting MSI-X vectors count to {count} for interface {self.name}")
4✔
374
        if method == "devlink":
4✔
375
            command = f"devlink resource set pci/{self.pci_address} path /msix/msix_vf/ size {count}"
4✔
376
        elif method == "sysfs":
4✔
377
            command = f"echo {count} > /sys/bus/pci/devices/{self.pci_address}/sriov_vf_msix_count"
4✔
378
        else:
379
            raise ValueError(f"Unknown method {method} for setting MSI-X vectors count")
4✔
380
        self._connection.execute_command(command, custom_exception=NetworkAdapterConfigurationException)
4✔
381
        logger.log(level=log_levels.MFD_INFO, msg=f"MSI-X vectors count set to {count} for interface {self.name}")
4✔
382

383
    def set_rss_queues_count(self, count: int, vf: NetworkInterface | None = None) -> None:
4✔
384
        """
385
        Set number of RSS queues for the given interface.
386

387
        :param count: Number of RSS queues to set
388
        :param vf: VF NetworkInterface object or None for PF
389
        :return: None
390
        """
391
        logger.log(
4✔
392
            level=log_levels.MFD_DEBUG,
393
            msg=f"Setting RSS queues count to {count} for interface {self.name if vf is None else vf.name}",
394
        )
395
        name = self.name
4✔
396
        vf_path = ""
4✔
397
        if vf is not None:
4✔
398
            vf_num = str(self.virtualization.get_vf_id_by_pci(vf.pci_address))
4✔
399
            vf_path = f"virtfn{vf_num}/"
4✔
400
        self._connection.execute_command(
4✔
401
            f"echo {count} > /sys/class/net/{name}/device/{vf_path}rss_lut_pf_attr",
402
            custom_exception=NetworkAdapterConfigurationException,
403
        )
404
        logger.log(
4✔
405
            level=log_levels.MFD_INFO,
406
            msg=f"Successfully set RSS queues count on interface {self.name if vf is None else vf.name}",
407
        )
408

409
    def get_rss_queues_count(self, vf: NetworkInterface | None = None) -> int:
4✔
410
        """
411
        Get number of RSS queues of the given interface.
412

413
        :param vf: VF NetworkInterface object or None for PF
414
        :return: Number of RSS queues
415
        """
NEW
UNCOV
416
        logger.log(
×
417
            level=log_levels.MFD_DEBUG,
418
            msg=f"Retrieving RSS queues count of interface {self.name if vf is None else vf.name}",
419
        )
NEW
UNCOV
420
        name = self.name
×
NEW
UNCOV
421
        vf_path = ""
×
NEW
UNCOV
422
        if vf is not None:
×
NEW
UNCOV
423
            vf_num = str(self.virtualization.get_vf_id_by_pci(vf.pci_address))
×
NEW
UNCOV
424
            vf_path = f"virtfn{vf_num}/"
×
NEW
UNCOV
425
        out = self._connection.execute_command(
×
426
            f"cat /sys/class/net/{name}/device/{vf_path}rss_lut_pf_attr",
427
            expected_return_codes={0},
428
        ).stdout
NEW
UNCOV
429
        return int(out)
×
430

431
    def set_sriov_drivers_autoprobe(self, state: bool) -> None:
4✔
432
        """
433
        Enable or disable SRIOV drivers auto probe.
434

435
        :param state: State to set (True or False)
436
        :return: None
437
        """
438
        logger.log(
4✔
439
            level=log_levels.MFD_DEBUG,
440
            msg=f"{'Enabling' if state else 'Disabling'} sriov_drivers_autoprobe on interface {self.name}",
441
        )
442
        self._connection.execute_command(
4✔
443
            f"echo {int(state)} > /sys/class/net/{self.name}/device/sriov_drivers_autoprobe",
444
            custom_exception=NetworkAdapterConfigurationException,
445
        )
446
        status = "enabled" if state else "disabled"
4✔
447
        logger.log(
4✔
448
            level=log_levels.MFD_INFO,
449
            msg=f"Successfully {status} sriov_drivers_autoprobe on interface {self.name}",
450
        )
451

452
    def restart(self) -> None:
4✔
453
        """Restart interface."""
UNCOV
454
        raise NotImplementedError
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc