• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

intel / mfd-network-adapter / 18743588143

23 Oct 2025 09:16AM UTC coverage: 92.781% (-0.08%) from 92.86%
18743588143

Pull #27

github

web-flow
Merge b8d6439b7 into ccf36e51c
Pull Request #27: feat: Add devlink commands and RSS queues settings

60 of 70 new or added lines in 3 files covered. (85.71%)

38 existing lines in 4 files now uncovered.

8496 of 9157 relevant lines covered (92.78%)

3.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.81
/mfd_network_adapter/network_interface/feature/virtualization/linux.py
1
# Copyright (C) 2025 Intel Corporation
2
# SPDX-License-Identifier: MIT
3
"""Module for Virtualization feature for Linux."""
4✔
4

5
import logging
4✔
6
import re
4✔
7
from typing import List
4✔
8

9
from mfd_common_libs import add_logging_level, log_levels
4✔
10
from mfd_const.network import DESIGNED_NUMBER_VFS_BY_SPEED, Speed
4✔
11
from mfd_typing import MACAddress, DeviceID, SubDeviceID, PCIAddress
4✔
12
from mfd_typing.network_interface import InterfaceType
4✔
13

14
from mfd_network_adapter.data_structures import State
4✔
15
from mfd_network_adapter.exceptions import VirtualFunctionNotFoundException
4✔
16
from .base import BaseFeatureVirtualization
4✔
17
from ...data_structures import VlanProto, VFDetail, LinkState
4✔
18
from ...exceptions import VirtualizationFeatureException, VirtualizationWrongInterfaceException, DeviceSetupException
4✔
19

20
logger = logging.getLogger(__name__)
4✔
21
add_logging_level(level_name="MODULE_DEBUG", level_value=log_levels.MODULE_DEBUG)
4✔
22

23

24
class LinuxVirtualization(BaseFeatureVirtualization):
4✔
25
    """Linux class for Virtualization feature."""
26

27
    def _raise_error_if_not_supported_type(self) -> None:
4✔
28
        """
29
        Raise error in case current interface is not PF/BTS.
30

31
        :raises VirtualizationWrongInterfaceException: if method is called on non PF/BTS interface
32
        """
33
        current_type = self._interface().interface_type
4✔
34
        if current_type not in (InterfaceType.PF, InterfaceType.BTS):
4✔
35
            raise VirtualizationWrongInterfaceException(
4✔
36
                f"Current interface type is {current_type} but should be InterfaceType.PF or InterfaceType.BTS"
37
            )
38

39
    def _get_vfs_details(self) -> List[VFDetail]:
4✔
40
        """
41
        Get VF details of PF interface.
42

43
        :raises VirtualizationWrongInterfaceException: if method is called on non PF interface
44
        :raises: VirtualizationFeatureException: in case of command failure (rc != 0)
45
        :return: List of VFDetail objects
46
        """
47
        self._raise_error_if_not_supported_type()
4✔
48

49
        command = f"ip link show dev {self._interface().name}"
4✔
50
        pattern = (
4✔
51
            r"vf\s*(?P<vf_id>\d+)\s*link/ether\s*(?P<mac_address>[0-9a-fA-F]{2}(?::[0-9a-fA-F]{2}){5})\s*.*?,"
52
            r"\s*spoof checking\s*(?P<spoofchk>\w+),\s*link-state\s*(?P<link_state>\w+),\s*trust\s*("
53
            r"?P<trust>\w+)"
54
        )
55
        link_state_map = {"enable": LinkState.ENABLE, "disable": LinkState.DISABLE, "auto": LinkState.AUTO}
4✔
56

57
        output = self._connection.execute_command(
4✔
58
            command=command, custom_exception=VirtualizationFeatureException
59
        ).stdout
60
        vf_details = []
4✔
61
        info_match = re.finditer(pattern, output)
4✔
62
        for match in info_match:
4✔
63
            vf_details.append(
4✔
64
                VFDetail(
65
                    id=int(match.group("vf_id")),
66
                    mac_address=MACAddress(match.group("mac_address")),
67
                    spoofchk=State.ENABLED if match.group("spoofchk") == "on" else State.DISABLED,
68
                    link_state=link_state_map.get(match.group("link_state")),
69
                    trust=State.ENABLED if match.group("trust") == "on" else State.DISABLED,
70
                )
71
            )
72
        return vf_details
4✔
73

74
    def _get_max_vfs_by_name(self) -> int:
4✔
75
        """
76
        Get maximal number of VFs per interface based on name.
77

78
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
79
        :raises: VirtualizationFeatureException in case of error
80
        :return: number of VFs
81
        """
82
        self._raise_error_if_not_supported_type()
4✔
83
        command = f"cat /sys/class/net/{self._interface().name}/device/sriov_totalvfs"
4✔
84
        result = self._connection.execute_command(command=command, custom_exception=VirtualizationFeatureException)
4✔
85

86
        return int(result.stdout)
4✔
87

88
    def _get_max_vfs_by_pci_address(self) -> int:
4✔
89
        """
90
        Get maximal number of VFs per interface based on PCI Address.
91

92
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
93
        :raises: VirtualizationFeatureException in case of error
94
        :return: number of VFs
95
        """
96
        self._raise_error_if_not_supported_type()
4✔
97
        command = f"cat /sys/bus/pci/devices/{self._interface().pci_address}/sriov_totalvfs"
4✔
98
        result = self._connection.execute_command(command=command, custom_exception=VirtualizationFeatureException)
4✔
99

100
        return int(result.stdout)
4✔
101

102
    def _get_current_vfs_by_name(self) -> int:
4✔
103
        """
104
        Get number of current VFs per interface based on name.
105

106
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
107
        :raises: VirtualizationFeatureException in case of error
108
        :return: number of VFs
109
        """
110
        self._raise_error_if_not_supported_type()
4✔
111
        command = f"cat /sys/class/net/{self._interface().name}/device/sriov_numvfs"
4✔
112
        result = self._connection.execute_command(command=command, custom_exception=VirtualizationFeatureException)
4✔
113

114
        return int(result.stdout)
4✔
115

116
    def _get_current_vfs_by_pci_address(self) -> int:
4✔
117
        """
118
        Get number of current VFs per interface based on PCI Address.
119

120
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
121
        :raises: VirtualizationFeatureException in case of error
122
        :return: number of VFs
123
        """
124
        self._raise_error_if_not_supported_type()
4✔
125
        command = f"cat /sys/bus/pci/devices/{self._interface().pci_address}/sriov_numvfs"
4✔
126
        result = self._connection.execute_command(command=command, custom_exception=VirtualizationFeatureException)
4✔
127

128
        return int(result.stdout)
4✔
129

130
    def get_max_vfs(self) -> int:
4✔
131
        """Get maximal number of VFs per interface."""
132
        return self._get_max_vfs_by_name() if self._interface().name else self._get_max_vfs_by_pci_address()
4✔
133

134
    def get_current_vfs(self) -> int:
4✔
135
        """Get number of current VFs per interface."""
136
        return self._get_current_vfs_by_name() if self._interface().name else self._get_current_vfs_by_pci_address()
4✔
137

138
    def get_designed_number_vfs(self) -> tuple[int, int]:
4✔
139
        """
140
        Return designed max number of VFs, total and per PF.
141

142
        :return: max VFs for NIC, max VFS per PF
143
        """
UNCOV
144
        speed = self._interface().speed
×
UNCOV
145
        if speed in (Speed.G1, Speed.G10, Speed.G40, Speed.G200):
×
UNCOV
146
            return DESIGNED_NUMBER_VFS_BY_SPEED[speed], int(
×
147
                DESIGNED_NUMBER_VFS_BY_SPEED[speed] / self._interface().get_number_of_ports()
148
            )
149
        elif speed is Speed.G100:
×
150
            device_id = self._interface().pci_device.device_id
×
151
            sub_device_id = self._interface().pci_device.sub_device_id
×
UNCOV
152
            if device_id == DeviceID(0x1592) and sub_device_id == SubDeviceID(0x000E):
×
UNCOV
153
                designed_max_vfs = 512
×
154
                designed_max_vfs_per_pf = 256
×
155
                logger.log(
×
156
                    level=log_levels.MODULE_DEBUG,
157
                    msg=(
158
                        "Recognized Chapman Beach (2 chips on NIC)."
159
                        f"Return designed max VFS: {designed_max_vfs} and per PF: {designed_max_vfs_per_pf}"
160
                    ),
161
                )
UNCOV
162
                return designed_max_vfs, designed_max_vfs_per_pf
×
163
            else:
UNCOV
164
                return DESIGNED_NUMBER_VFS_BY_SPEED[Speed.G100], int(
×
165
                    DESIGNED_NUMBER_VFS_BY_SPEED[Speed.G100] / self._interface().get_number_of_ports()
166
                )
167
        else:
UNCOV
168
            raise DeviceSetupException(
×
169
                "Cannot recognize NIC. \n"
170
                f"Device string: {self._interface().get_device_string()}.\n"
171
                f"Device ID: {self._interface().pci_device.device_id}.\n"
172
                f"Sub Device ID: {self._interface().pci_device.sub_device_id}.\n"
173
            )
174

175
    def set_sriov(self, sriov_enabled: bool, no_restart: bool = False) -> None:
4✔
176
        """
177
        Set network interface SRIOV.
178

179
        :param sriov_enabled: adapter SRIOV status value to be set.
180
        :param no_restart: whether to restart adapter after changing its settings.
181
        """
UNCOV
182
        raise NotImplementedError
×
183

184
    def set_vmq(self, vmq_enabled: bool, no_restart: bool = False) -> None:
4✔
185
        """
186
        Set network interface VMQ.
187

188
        :param vmq_enabled: adapter VMQ status value to be set.
189
        :param no_restart: whether to restart adapter after changing its settings.
190
        """
UNCOV
191
        raise NotImplementedError
×
192

193
    def set_max_tx_rate(self, vf_id: int, value: int) -> None:
4✔
194
        """
195
        Set max_tx_rate VF-d parameter status.
196

197
        :param: vf_id: VF (virtual function) ID
198
        :param value: max_tx_rate value (Mbits)
199
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
200
        :raises VirtualizationFeatureException if command execution fails
201
        """
202
        self._raise_error_if_not_supported_type()
4✔
203

204
        cmd = f"ip link set dev {self._interface().name} vf {vf_id} max_tx_rate {value}"
4✔
205
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Set max tx rate using : {cmd}")
4✔
206
        self._connection.execute_command(command=cmd, custom_exception=VirtualizationFeatureException)
4✔
207

208
    def set_min_tx_rate(self, vf_id: int, value: int) -> None:
4✔
209
        """
210
        Set min_tx_rate VF-d parameter status.
211

212
        :param vf_id: VF (virtual function) ID
213
        :param value: min_tx_rate value (Mbits)
214
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
215
        :raises VirtualizationFeatureException if command execution fails
216
        """
217
        self._raise_error_if_not_supported_type()
4✔
218
        cmd = f"ip link set dev {self._interface().name} vf {vf_id} min_tx_rate {value}"
4✔
219
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Set min tx rate using : {cmd}")
4✔
220
        self._connection.execute_command(command=cmd, custom_exception=VirtualizationFeatureException)
4✔
221

222
    def set_trust(self, vf_id: int, state: State) -> None:
4✔
223
        """
224
        Change 'trust' setting on VF Interface.
225

226
        :param vf_id: Virtual Function ID
227
        :param state: State to be set: Enabled or Disabled (On/Off)
228
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
229
        :raises: VirtualizationFeatureException in case of command error
230
        """
231
        self._raise_error_if_not_supported_type()
4✔
232
        value = "on" if state == State.ENABLED else "off"
4✔
233
        cmd = f"ip link set {self._interface().name} vf {vf_id} trust {value}"
4✔
234
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Set trust on VF ID: {vf_id} to '{value}'")
4✔
235
        self._connection.execute_command(command=cmd, custom_exception=VirtualizationFeatureException)
4✔
236

237
    def set_spoofchk(self, vf_id: int, state: State) -> None:
4✔
238
        """
239
        Set a VF to spoofchk on/off.
240

241
        Turning off spoofchk allows the VF to change its MAC address.
242
        :param vf_id: Virtual Function ID
243
        :param state: State to be set: Enabled or Disabled (On/Off)
244
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
245
        :raises: VirtualizationFeatureException in case of error
246
        """
247
        self._raise_error_if_not_supported_type()
4✔
248
        value = "on" if state == State.ENABLED else "off"
4✔
249
        cmd = f"ip link set {self._interface().name} vf {vf_id} spoofchk {value}"
4✔
250
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Set spoofchk on VF ID: {vf_id} to '{value}'")
4✔
251
        self._connection.execute_command(command=cmd, custom_exception=VirtualizationFeatureException)
4✔
252

253
    def get_trust(self, vf_id: int) -> State:
4✔
254
        """
255
        Get trust setting per VF.
256

257
        :param vf_id: Virtual Function ID
258
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
259
        :return: State of trust setting
260
        """
261
        for vf in self._get_vfs_details():
4✔
262
            if vf_id == vf.id:
4✔
263
                return vf.trust
4✔
264

265
    def get_spoofchk(self, vf_id: int) -> State:
4✔
266
        """
267
        Get spoofhck setting per VF.
268

269
        :param vf_id: Virtual Function ID
270
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
271
        :return: State of spoofhck setting
272
        """
273
        for vf in self._get_vfs_details():
4✔
274
            if vf_id == vf.id:
4✔
275
                return vf.spoofchk
4✔
276

277
    def get_mac_address(self, vf_id: int) -> MACAddress:
4✔
278
        """
279
        Get mac_address per VF.
280

281
        :param vf_id: Virtual Function ID
282
        :raises VirtualizationWrongInterfaceException: if method is called on non PF interface
283
        :return: Mac Address of VF
284
        """
285
        for vf in self._get_vfs_details():
4✔
286
            if vf_id == vf.id:
4✔
287
                return vf.mac_address
4✔
288

289
    def get_link_state(self, vf_id: int) -> LinkState:
4✔
290
        """
291
        Get link-state per VF.
292

293
        :param vf_id: Virtual Function ID
294
        :raises VirtualizationWrongInterfaceException: if method is called on non PF interface
295
        :return: Link State setting of VF
296
        """
297
        for vf in self._get_vfs_details():
4✔
298
            if vf_id == vf.id:
4✔
299
                return vf.link_state
4✔
300

301
    def set_vlan_for_vf(self, vf_id: int, vlan_id: int, proto: VlanProto = None) -> None:
4✔
302
        """
303
        Set port VLAN for a VF interface.
304

305
        :param vf_id: Virtual Function ID
306
        :param vlan_id: VLAN to set the VF to
307
        :param proto: Specify 802.1ad or 802.1Q type of VLAN
308
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
309
        :raises: VirtualizationFeatureException in case of error
310
        """
311
        self._raise_error_if_not_supported_type()
4✔
312
        proto_suffix = f" proto {proto.value}" if proto else ""
4✔
313
        cmd = f"ip link set {self._interface().name} vf {vf_id} vlan {vlan_id}{proto_suffix}"
4✔
314
        self._connection.execute_command(command=cmd, custom_exception=VirtualizationFeatureException)
4✔
315

316
    def set_link_for_vf(self, vf_id: int, link_state: LinkState) -> None:
4✔
317
        """
318
        Set link for a VF interface.
319

320
        :param vf_id: Virtual Function ID
321
        :param link_state: requested link state (one of auto, enabled, disabled)
322
        :raises VirtualizationWrongInterfaceException if method is called on non PF interface
323
        :raises: VirtualizationFeatureException in case of error
324
        """
325
        self._raise_error_if_not_supported_type()
4✔
326
        cmd = f"ip link set {self._interface().name} vf {vf_id} state {link_state.value}"
4✔
327
        self._connection.execute_command(command=cmd, custom_exception=VirtualizationFeatureException)
4✔
328

329
    def set_mac_for_vf(self, vf_id: int, mac: MACAddress) -> None:
4✔
330
        """
331
        Set MAC address for VF interface.
332

333
        :param vf_id: Virtual Function ID
334
        :param mac: MAC to set on VF interface
335
        """
336
        logger.warning(
4✔
337
            "This API is deprecated - `interface.virtualization.set_mac_for_vf()`. "
338
            "Use `owner.mac.set_mac_for_vf() instead."
339
        )
340
        self._raise_error_if_not_supported_type()
4✔
341
        cmd = f"ip link set {self._interface().name} vf {vf_id} mac {mac}"
4✔
342
        self._connection.execute_command(command=cmd, custom_exception=VirtualizationFeatureException)
4✔
343

344
    def get_vf_id_by_pci(self, vf_pci_address: PCIAddress) -> int:
4✔
345
        """
346
        Get ID of VF with the given PCI address on specific PF PCI address using /sys/bus/pci/devices/pci_address.
347

348
        :param vf_pci_address: VF interface PCI address.
349
        :return: ID of the VF.
350
        """
351
        result = self._connection.execute_command(
4✔
352
            f"ls /sys/bus/pci/devices/{self._interface().pci_address}/virtfn* -la",
353
            shell=True,
354
            expected_return_codes={0, 1},
355
        )
356
        if result.return_code != 0:
4✔
357
            raise VirtualFunctionNotFoundException(
4✔
358
                f"Failed to list VFs for PF PCI Address {self._interface().pci_address}: {result.stderr.strip()}"
359
            )
360
        vf_number_regex = rf"^.*devices/{self._interface().pci_address}/virtfn(?P<vf_number>\d+).*->.*{vf_pci_address}$"
4✔
361
        match = re.search(vf_number_regex, result.stdout, re.M)
4✔
362
        if match:
4✔
363
            return int(match.group("vf_number"))
4✔
364
        else:
365
            raise VirtualFunctionNotFoundException(f"0 matched VFs for PF PCI Address {self._interface().pci_address}")
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc