• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

intel / mfd-network-adapter / 18743592086

23 Oct 2025 09:16AM UTC coverage: 92.781% (-0.08%) from 92.86%
18743592086

Pull #27

github

web-flow
Merge 040ed79f6 into ccf36e51c
Pull Request #27: feat: Add devlink commands and RSS queues settings

60 of 70 new or added lines in 3 files covered. (85.71%)

38 existing lines in 4 files now uncovered.

8496 of 9157 relevant lines covered (92.78%)

7.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.13
/mfd_network_adapter/network_interface/linux.py
1
# Copyright (C) 2025 Intel Corporation
2
# SPDX-License-Identifier: MIT
3
"""Module for Network Interface for Linux."""
8✔
4

5
import logging
8✔
6
import re
8✔
7
from dataclasses import fields
8✔
8
from typing import Dict, Optional, TYPE_CHECKING, Union
8✔
9

10
from mfd_common_libs import add_logging_level, log_levels
8✔
11
from mfd_connect.base import ConnectionCompletedProcess
8✔
12
from mfd_connect.exceptions import ConnectionCalledProcessError
8✔
13
from mfd_kernel_namespace import add_namespace_call_command
8✔
14
from mfd_typing import MACAddress
8✔
15
from mfd_typing.driver_info import DriverInfo
8✔
16
from mfd_typing.network_interface import LinuxInterfaceInfo, VsiInfo
8✔
17

18
from mfd_network_adapter import NetworkAdapterOwner
8✔
19
from .base import NetworkInterface
8✔
20
from .data_structures import RingBufferSettings, RingBuffer
8✔
21
from .exceptions import (
8✔
22
    BrandingStringException,
23
    DeviceStringException,
24
    NetworkQueuesException,
25
    RDMADeviceNotFound,
26
    NumaNodeException,
27
    RingBufferException,
28
    RingBufferSettingException,
29
    FirmwareVersionNotFound,
30
    DeviceSetupException,
31
)
32
from ..api.basic.linux import get_mac_address
8✔
33
from ..exceptions import NetworkAdapterConfigurationException
8✔
34

35
if TYPE_CHECKING:
36
    from mfd_connect import Connection
37
    from mfd_model.config import NetworkInterfaceModelBase
38
    from mfd_libibverbs_utils import IBVDevices
39

40
logger = logging.getLogger(__name__)
8✔
41
add_logging_level(level_name="MODULE_DEBUG", level_value=log_levels.MODULE_DEBUG)
8✔
42

43

44
class LinuxNetworkInterface(NetworkInterface):
8✔
45
    """Class to handle Network Interface in Linux."""
46

47
    _ibv_devices: "IBVDevices" = None
8✔
48

49
    def __init__(
8✔
50
        self,
51
        owner: "NetworkAdapterOwner" = None,
52
        interface_info: LinuxInterfaceInfo = None,  # None should be removed with the owner
53
        topology: "NetworkInterfaceModelBase | None" = None,
54
        *,  # should be moved as a first arg with owner removal
55
        connection: "Connection" = None,
56
        **kwargs,
57
    ) -> None:
58
        """
59
        Linux Network Interface Constructor.
60

61
        :param owner: NetworkAdapterOwner object
62
        :param interface_info: InterfaceInfo object
63
        :param topology: NetworkInterfaceModelBase object
64
        :param connection: Connection object
65
        """
66
        super().__init__(connection=connection, owner=owner, interface_info=interface_info, topology=topology, **kwargs)
8✔
67

68
    @property
8✔
69
    def namespace(self) -> Union[str, None]:
8✔
70
        """Get namespace."""
71
        return self._interface_info.namespace
8✔
72

73
    @property
8✔
74
    def vsi_info(self) -> Union[VsiInfo, None]:
8✔
75
        """Get VSI Info."""
UNCOV
76
        return self._interface_info.vsi_info
×
77

78
    def get_branding_string(self) -> str:
8✔
79
        """
80
        Get branding string.
81

82
        :return: Branding string
83
        :raises BrandingStringException: if branding string not found
84
        """
85
        command = add_namespace_call_command(f"lspci -s {self.pci_address.lspci} -v", self.namespace)
8✔
86
        subsystem_pattern = r"\s*Subsystem: (?P<branding_string>.+)"
8✔
87
        lspci_output = self._connection.execute_command(command).stdout
8✔
88
        match = re.search(subsystem_pattern, lspci_output)
8✔
89
        if not match:
8✔
90
            raise BrandingStringException(
8✔
91
                f"No matching branding string found for pci address: {self.pci_address.lspci}"
92
            )
93
        return match.group("branding_string").rstrip()
8✔
94

95
    def get_device_string(self) -> str:
8✔
96
        """
97
        Get device string.
98

99
        :return: Device string
100
        :raises DeviceStringException: if device string not found
101
        """
102
        command = add_namespace_call_command(f"lspci -s {self.pci_address.lspci} -v", self.namespace)
8✔
103
        device_string_pattern = r"\d+:\d+.\d\s*(?:Ethernet controller|Class \d+): (?P<device_string>.+)"
8✔
104
        lspci_output = self._connection.execute_command(command).stdout
8✔
105
        match = re.search(device_string_pattern, lspci_output)
8✔
106
        if not match:
8✔
107
            raise DeviceStringException(f"No matching device string found for pci address: {self.pci_address.lspci}")
8✔
108
        return match.group("device_string").rstrip()
8✔
109

110
    def get_mac_address(self) -> MACAddress:
8✔
111
        """
112
        Get MAC Address of interface.
113

114
        :return: MACAddress
115
        """
116
        logger.warning("This API is deprecated - `interface.get_mac_address()`. Use `interface.mac.get_mac() instead.")
×
UNCOV
117
        return get_mac_address(self._connection, interface_name=self.name, namespace=self.namespace)
×
118

119
    def get_network_queues(self) -> Dict:
8✔
120
        """
121
        Get network queue values for network interface using ethtool --show-channels.
122

123
        :return: Dictionary containing queue values
124
        :raises NetworkQueuesException if command failed
125
        """
126
        ethtool_command = add_namespace_call_command(f"ethtool -l {self.name}", self.namespace)
8✔
127
        ethtool_output = self._connection.execute_command(ethtool_command).stdout
8✔
128
        queues_pattern = r"settings:"
8✔
129
        queue_types = ["RX", "TX", "Other", "Combined"]
8✔
130
        for queue_type in queue_types:
8✔
131
            queues_pattern += rf"\n{queue_type}:\s+(?P<{queue_type.lower()}>[0-9]+|n/a)"
8✔
132

133
        queues_pattern = re.compile(queues_pattern)
8✔
134

135
        queues = {}
8✔
136
        for match in queues_pattern.finditer(ethtool_output):
8✔
137
            queues = match.groupdict()
8✔
138
            for key, value in queues.items():
8✔
139
                _value = int(value) if value.isdigit() else None
8✔
140
                queues[key] = _value
8✔
141
        if not queues:
8✔
142
            raise NetworkQueuesException(f"Could not read network queues for interface {self.name}")
8✔
143
        return queues
8✔
144

145
    def set_network_queues(
8✔
146
        self,
147
        rx: Optional[int] = None,
148
        tx: Optional[int] = None,
149
        other: Optional[int] = None,
150
        combined: Optional[int] = None,
151
    ) -> None:
152
        """
153
        Set network queues for network interface using ethtool --set-channels.
154

155
        :param rx: Value to set for RX queues. If not provided no value will be set
156
        :param tx: Value to set for RX queues. If not provided no value will be set
157
        :param other: Value to set for RX queues. If not provided no value will be set
158
        :param combined: Value to set for RX queues. If not provided no value will be set
159
        :raises NetworkQueuesException if no values passed or command failed.
160
        """
161
        start_command = f"ethtool -L {self.name}"
8✔
162
        ethtool_command = start_command
8✔
163
        if rx is not None:
8✔
164
            ethtool_command += f" rx {rx}"
8✔
165
        if tx is not None:
8✔
UNCOV
166
            ethtool_command += f" tx {tx}"
×
167
        if other is not None:
8✔
UNCOV
168
            ethtool_command += f" other {other}"
×
169
        if combined is not None:
8✔
UNCOV
170
            ethtool_command += f" combined {combined}"
×
171
        if start_command == ethtool_command:
8✔
172
            raise NetworkQueuesException("No values set to queues")
8✔
173

174
        result = self._connection.execute_command(add_namespace_call_command(ethtool_command, namespace=self.namespace))
8✔
175
        rc = result.return_code
8✔
176
        if rc:
8✔
177
            raise NetworkQueuesException(
8✔
178
                f"Failed to set network queues for interface {self.name}." f"\n{result.stdout}"
179
            )
180

181
    @property
8✔
182
    def ibv_devices(self) -> "IBVDevices":
8✔
183
        """
184
        Tool IBVDevices property established with first usage.
185

186
        :return IBVDevices
187
        """
188
        if self._ibv_devices is None:
8✔
189
            from mfd_libibverbs_utils import IBVDevices
8✔
190

191
            self._ibv_devices = IBVDevices(connection=self._connection)
8✔
192
        return self._ibv_devices
8✔
193

194
    def get_rdma_device_name(self) -> str:
8✔
195
        """
196
        Get RDMA device name for network interface.
197

198
        :raises RDMADeviceNotFound: if not found device for interface.
199
        :return: Read RDMA device name.
200
        """
201
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Getting RDMA device name for {self.name}")
8✔
202
        rdma_dev_cmd = (
8✔
203
            f"ls /sys/class/net/{self.name}/device/infiniband/ 2>/dev/null",
204
            f"ls /sys/class/net/{self.name}/device/ice.roce.*/infiniband/ 2>/dev/null",
205
        )
206
        for cmd in rdma_dev_cmd:
8✔
207
            try:
8✔
208
                rdma_device = self._connection.execute_command(cmd, shell=True).stdout.strip()
8✔
209
                if rdma_device:
8✔
210
                    logger.log(
8✔
211
                        level=log_levels.MODULE_DEBUG, msg=f"Found {rdma_device} RDMA device name for {self.name}"
212
                    )
213
                    return rdma_device
8✔
214
            except ConnectionCalledProcessError:
×
UNCOV
215
                pass
×
216
        devices = self.ibv_devices.get_list()
8✔
217
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Found RDMA devices: {devices}")
8✔
218
        raise RDMADeviceNotFound(f"Failed to find RDMA device for {self.name}")
8✔
219

220
    def get_numa_node(self) -> int:
8✔
221
        """
222
        Get the Non-Uniform Memory Architecture NUMA Node of the network interface.
223

224
        :raises NumaNodeException if numa file is not preset for interface.
225
        :return (int): NUMA node of the test network interface.
226
        """
227
        try:
8✔
228
            node = self._connection.execute_command(
8✔
229
                add_namespace_call_command(f"cat /sys/class/net/{self.name}/device/numa_node", namespace=self.namespace)
230
            ).stdout
231
        except ConnectionCalledProcessError:
×
UNCOV
232
            raise NumaNodeException(f"NUMA node cannot be determined for interface: {self.name}")
×
233
        return int(node)
8✔
234

235
    def get_ring_settings(self) -> RingBufferSettings:
8✔
236
        """
237
        Get ring buffer settings.
238

239
        :return: RingBufferSettings obj with current and max settings.
240
        :raises RingBufferException when regex didn't match ethtool output.
241
        """
242
        ethtool_output = self._connection.execute_command(
8✔
243
            add_namespace_call_command(f"ethtool -g {self.name}", namespace=self.namespace)
244
        ).stdout.strip()
245
        ethtool_output_pattern = (
8✔
246
            r"pre-set maximums:(?P<maximum_settings>.*)current hardware settings:(?P<current_settings>.*)"
247
        )
248

249
        matched_settings = re.search(ethtool_output_pattern, ethtool_output, re.I | re.S | re.M)
8✔
250
        if matched_settings is None:
8✔
UNCOV
251
            raise RingBufferException(
×
252
                f"Regex expression {ethtool_output_pattern} didn't match ethtool output {ethtool_output}"
253
            )
254

255
        settings = RingBufferSettings()
8✔
256
        for settings_type_name, settings_type in settings.__dict__.items():
8✔
257
            for setting in fields(settings_type):
8✔
258
                setting_pattern = rf'{setting.name.replace("_", " ")}:\s+(?P<setting_value>\d+)'
8✔
259
                matched_setting_value = re.search(
8✔
260
                    setting_pattern, matched_settings[f"{settings_type_name}_settings"], re.I
261
                )
262
                if matched_setting_value:
8✔
263
                    setattr(settings_type, setting.name, int(matched_setting_value["setting_value"]))
8✔
264

265
        return settings
8✔
266

267
    def set_ring_settings(self, settings: RingBuffer) -> None:
8✔
268
        """
269
        Set ring buffer settings.
270

271
        :param settings: RingBufferSettings obj with values to be set.
272
        """
273
        self._connection.execute_command(
8✔
274
            add_namespace_call_command(f"ethtool -G {self.name} {settings!r}", namespace=self.namespace),
275
            custom_exception=RingBufferSettingException,
276
        )
277

278
    def get_firmware_version(self) -> str:
8✔
279
        """
280
        Get firmware version with ethtool (nvm version, eetrack_id, combo_boot_image_version).
281

282
        :return: Firmware version
283
        :raises FirmwareVersionNotFound: If firmware version was not found
284
        """
285
        command_ethtool = add_namespace_call_command(f"ethtool -i {self.name}", self.namespace)
×
UNCOV
286
        interface_info = self._connection.execute_command(command_ethtool).stdout
×
287

288
        version_match = re.search(r"firmware-version: (?P<firmware_version>.+)", interface_info, re.MULTILINE)
×
289
        if not version_match:
×
UNCOV
290
            raise FirmwareVersionNotFound(f"Can't find firmware version for [{self.name}]!")
×
291

UNCOV
292
        return version_match.group("firmware_version")
×
293

294
    def get_driver_info(self) -> DriverInfo:
8✔
295
        """
296
        Get information about driver name and version with Get-NetAdapter Powershell commandlet.
297

298
        :return: DriverInfo dataclass that contains driver_name and driver_version
299
        :raises: DriverInfoNotFound if failed.
300
        """
UNCOV
301
        return self.driver.get_driver_info()
×
302

303
    def get_number_of_ports(self) -> int:
8✔
304
        """
305
        Get number of ports in tested adapter.
306

307
        :return: Number of ports in tested adapter
308
        :raise: DeviceSetupException: when any number of ports not found
309
        """
310
        result = self._connection.execute_command(
8✔
311
            command=f"lspci | grep Eth | awk -F ':' '{{print $NF}}' | uniq -c | grep '{self.get_device_string()}'",
312
            shell=True,
313
            expected_return_codes={0},
314
        )
315

316
        regex = re.search(r"^\s*(?P<number_ports>\d+)", result.stdout)
8✔
317
        if regex:
8✔
318
            return int(regex.group("number_ports"))
8✔
319
        else:
320
            raise DeviceSetupException("Can't find number of ports in tested adapter.")
8✔
321

322
    def reload_adapter_devlink(self) -> ConnectionCompletedProcess:
8✔
323
        """
324
        Reload adapter using devlink.
325

326
        :return: ConnectionCompletedProcess
327
        """
NEW
328
        logger.log(level=log_levels.MFD_DEBUG, msg=f"Reloading adapter {self.name} using devlink")
×
NEW
329
        return self._connection.execute_command(
×
330
            f"devlink dev reload pci/{self.pci_address}", expected_return_codes={0, 1}
331
        )
332

333
    def get_msix_vectors_count(self, method: str = "devlink") -> int | None:
8✔
334
        """
335
        Get number of MSI-X vectors for the given interface.
336

337
        :param method: Method to use for setting MSI-X vectors count. Options are "devlink" or "sysfs".
338
        :return: Number of MSI-X vectors available for the interface. None if number of vectors could not be determined.
339
        """
340
        logger.log(level=log_levels.MFD_DEBUG, msg=f"Getting MSI-X vectors count for interface {self.name}")
8✔
341
        if method == "devlink":
8✔
342
            out = self._connection.execute_command(f"devlink resource show pci/{self.pci_address}").stdout
8✔
343
            match = re.search(r"name msix_vf size (\d+) ", out)
8✔
344
            if match:
8✔
345
                logger.log(
8✔
346
                    level=log_levels.MFD_INFO, msg=f"MSI-X vectors count for interface {self.name}: {match.group(1)}"
347
                )
348
                return int(match.group(1))
8✔
349
            else:
350
                logger.error(f"Could not find MSI-X vectors count for interface {self.name}")
8✔
351
                return None
8✔
352
        if method == "sysfs":
8✔
353
            out = self._connection.execute_command(
8✔
354
                f"cat /sys/bus/pci/devices/{self.pci_address}/sriov_vf_msix_count"
355
            ).stdout
356
            if out:
8✔
357
                logger.log(level=log_levels.MFD_INFO, msg=f"MSI-X vectors count for interface {self.name}: {out}")
8✔
358
                return int(out)
8✔
359
            else:
360
                logger.error(f"Could not find MSI-X vectors count for interface {self.name}")
8✔
361
                return None
8✔
362

363
        raise ValueError(f"Unknown method {method} for getting MSI-X vectors count")
8✔
364

365
    def set_msix_vectors_count(self, count: int, method: str = "devlink") -> None:
8✔
366
        """
367
        Set number of MSI-X vectors for the given interface.
368

369
        :param count: Number of MSI-X vectors to set
370
        :param method: Method to use for setting MSI-X vectors count. Options are "devlink" or "sysfs".
371
        :return: None
372
        """
373
        logger.log(level=log_levels.MFD_DEBUG, msg=f"Setting MSI-X vectors count to {count} for interface {self.name}")
8✔
374
        if method == "devlink":
8✔
375
            command = f"devlink resource set pci/{self.pci_address} path /msix/msix_vf/ size {count}"
8✔
376
        elif method == "sysfs":
8✔
377
            command = f"echo {count} > /sys/bus/pci/devices/{self.pci_address}/sriov_vf_msix_count"
8✔
378
        else:
379
            raise ValueError(f"Unknown method {method} for setting MSI-X vectors count")
8✔
380
        self._connection.execute_command(command, custom_exception=NetworkAdapterConfigurationException)
8✔
381
        logger.log(level=log_levels.MFD_INFO, msg=f"MSI-X vectors count set to {count} for interface {self.name}")
8✔
382

383
    def set_rss_queues_count(self, count: int, vf: NetworkInterface | None = None) -> None:
8✔
384
        """
385
        Set number of RSS queues for the given interface.
386

387
        :param count: Number of RSS queues to set
388
        :param vf: VF NetworkInterface object or None for PF
389
        :return: None
390
        """
391
        logger.log(
8✔
392
            level=log_levels.MFD_DEBUG,
393
            msg=f"Setting RSS queues count to {count} for interface {self.name if vf is None else vf.name}",
394
        )
395
        name = self.name
8✔
396
        vf_path = ""
8✔
397
        if vf is not None:
8✔
398
            vf_num = str(self.virtualization.get_vf_id_by_pci(vf.pci_address))
8✔
399
            vf_path = f"virtfn{vf_num}/"
8✔
400
        self._connection.execute_command(
8✔
401
            f"echo {count} > /sys/class/net/{name}/device/{vf_path}rss_lut_pf_attr",
402
            custom_exception=NetworkAdapterConfigurationException,
403
        )
404
        logger.log(
8✔
405
            level=log_levels.MFD_INFO,
406
            msg=f"Successfully set RSS queues count on interface {self.name if vf is None else vf.name}",
407
        )
408

409
    def get_rss_queues_count(self, vf: NetworkInterface | None = None) -> int:
8✔
410
        """
411
        Get number of RSS queues of the given interface.
412

413
        :param vf: VF NetworkInterface object or None for PF
414
        :return: Number of RSS queues
415
        """
NEW
UNCOV
416
        logger.log(
×
417
            level=log_levels.MFD_DEBUG,
418
            msg=f"Retrieving RSS queues count of interface {self.name if vf is None else vf.name}",
419
        )
NEW
UNCOV
420
        name = self.name
×
NEW
UNCOV
421
        vf_path = ""
×
NEW
UNCOV
422
        if vf is not None:
×
NEW
UNCOV
423
            vf_num = str(self.virtualization.get_vf_id_by_pci(vf.pci_address))
×
NEW
UNCOV
424
            vf_path = f"virtfn{vf_num}/"
×
NEW
UNCOV
425
        out = self._connection.execute_command(
×
426
            f"cat /sys/class/net/{name}/device/{vf_path}rss_lut_pf_attr",
427
            expected_return_codes={0},
428
        ).stdout
NEW
UNCOV
429
        return int(out)
×
430

431
    def set_sriov_drivers_autoprobe(self, state: bool) -> None:
8✔
432
        """
433
        Enable or disable SRIOV drivers auto probe.
434

435
        :param state: State to set (True or False)
436
        :return: None
437
        """
438
        logger.log(
8✔
439
            level=log_levels.MFD_DEBUG,
440
            msg=f"{'Enabling' if state else 'Disabling'} sriov_drivers_autoprobe on interface {self.name}",
441
        )
442
        self._connection.execute_command(
8✔
443
            f"echo {int(state)} > /sys/class/net/{self.name}/device/sriov_drivers_autoprobe",
444
            custom_exception=NetworkAdapterConfigurationException,
445
        )
446
        status = "enabled" if state else "disabled"
8✔
447
        logger.log(
8✔
448
            level=log_levels.MFD_INFO,
449
            msg=f"Successfully {status} sriov_drivers_autoprobe on interface {self.name}",
450
        )
451

452
    def restart(self) -> None:
8✔
453
        """Restart interface."""
UNCOV
454
        raise NotImplementedError
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc