• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

intel / mfd-cli-client / 23151258070

16 Mar 2026 03:20PM UTC coverage: 88.889% (+0.4%) from 88.509%
23151258070

Pull #4

github

web-flow
Merge c2d4d90ec into 1086f8243
Pull Request #4: Feature/delete mirror profile

11 of 12 new or added lines in 1 file covered. (91.67%)

33 existing lines in 1 file now uncovered.

296 of 333 relevant lines covered (88.89%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.69
/mfd_cli_client/base.py
1
# Copyright (C) 2025 Intel Corporation
2
# SPDX-License-Identifier: MIT
3
"""Module for command line interface client."""
1✔
4

5
import logging
1✔
6
import re
1✔
7
from dataclasses import dataclass
1✔
8
from pathlib import Path
1✔
9
from time import sleep
1✔
10
from typing import Optional, Iterable, Dict, Union, List
1✔
11
from enum import IntEnum
1✔
12

13
from mfd_common_libs import add_logging_level, log_levels, os_supported
1✔
14
from mfd_base_tool import ToolTemplate
1✔
15
from mfd_typing import OSName, MACAddress
1✔
16

17
from .exceptions import CliClientException, CliClientNotAvailable
1✔
18

19
logger = logging.getLogger(__name__)
1✔
20
add_logging_level("MODULE_DEBUG", log_levels.MODULE_DEBUG)
1✔
21

22

23
@dataclass
1✔
24
class FlowStats:
1✔
25
    """Structure for single direction statistics."""
26

27
    traffic_class_counters: List[int]
1✔
28
    packet: int
1✔
29
    discards: int
1✔
30

31

32
@dataclass
1✔
33
class SwitchStats:
1✔
34
    """Structure for both directions statistics."""
35

36
    egress: FlowStats
1✔
37
    ingress: FlowStats
1✔
38
    unicast_packet: int
1✔
39
    multicast_packet: int
1✔
40
    broadcast_packet: int
1✔
41

42

43
@dataclass
1✔
44
class VSIFlowStats:
1✔
45
    """Structure for VSI statistics."""
46

47
    packet: int
1✔
48
    unicast_packet: int
1✔
49
    multicast_packet: int
1✔
50
    broadcast_packet: int
1✔
51
    discards_packet: int
1✔
52
    errors_packet: int
1✔
53
    unknown_packet: int | None = None
1✔
54

55

56
@dataclass
1✔
57
class VSIStats:
1✔
58
    """Structure for both directions VSI statistics."""
59

60
    ingress: VSIFlowStats
1✔
61
    egress: VSIFlowStats
1✔
62

63

64
@dataclass
1✔
65
class TrafficClassCounters:
1✔
66
    """Structure for both directions Traffic Classes Counter."""
67

68
    tx: List[int]
1✔
69
    rx: List[int]
1✔
70

71

72
@dataclass
1✔
73
class VsiListEntry:
1✔
74
    """Structure for an entry in VSI list containing VSI ID and MAC address."""
75

76
    vsi_id: int
1✔
77
    mac: MACAddress
1✔
78

79

80
@dataclass
1✔
81
class VsiConfigListEntry:
1✔
82
    """Structure for an entry in VSI Config list containing all fields."""
83

84
    fn_id: int
1✔
85
    host_id: int
1✔
86
    is_vf: bool
1✔
87
    vsi_id: int
1✔
88
    vport_id: int
1✔
89
    is_created: bool
1✔
90
    is_enabled: bool
1✔
91
    mac: MACAddress
1✔
92

93

94
class LinkStatus(IntEnum):
1✔
95
    """Link Status enum represents link state."""
96

97
    DOWN = 0
1✔
98
    UP = 1
1✔
99

100

101
class CliClient(ToolTemplate):
1✔
102
    """Module for command line interface client tool."""
103

104
    tool_executable_name = "cli_client"
1✔
105
    ALL_USER_PRIORITY_TRAFFIC_CLASS = 8
1✔
106
    _MIRROR_PROFILE_SUCCESS_MARKERS = (
1✔
107
        "command succeeded",
108
        "random mirror profile set",
109
    )
110

111
    __init__ = os_supported(OSName.LINUX)(ToolTemplate.__init__)
1✔
112

113
    def _get_tool_exec_factory(self) -> str:
1✔
114
        """Get correct tool name."""
UNCOV
115
        return self.tool_executable_name
×
116

117
    def _apply_config_changes(self, module: str, success_val: str, config_file_path: Union[Path, str]) -> None:
1✔
118
        """
119
        Apply qos config file change (VMRL, TUPRL ect.) through the cli_client.
120

121
        :param config_file_path: Path to config file.
122
        :param module: Module to modify.
123
        :param success_val: Expected lower of success string.
124
        :raises CliClientException: on failure.
125
        """
126
        logger.log(level=log_levels.MODULE_DEBUG, msg="Apply the CP configuration changes.")
1✔
127
        output = self.execute_cli_client_command(command=f"-b qos -m -C {module} -f {config_file_path}")
1✔
128
        if success_val in output.lower():
1✔
129
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Configure and update {module} passed.")
1✔
130
        else:
UNCOV
131
            raise CliClientException(f"Configure and update {module} failed.")
×
132
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Wait 10 sec for update {config_file_path} file.")
1✔
133
        sleep(10)
1✔
134

135
    def check_if_available(self) -> None:
1✔
136
        """
137
        Check if tool is available in system.
138

139
        :raises CliClientNotAvailable: when tool not available.
140
        """
UNCOV
141
        self._connection.execute_command(f"{self._tool_exec} -h", custom_exception=CliClientNotAvailable)
×
142

143
    def execute_cli_client_command(
1✔
144
        self, command: str, *, timeout: int = 120, expected_return_codes: Iterable = frozenset({0})
145
    ) -> str:
146
        """
147
        Execute any command passed through command parameter with command line interface client tool.
148

149
        :param command: Command to execute using command line interface client tool.
150
        :param timeout: Maximum wait time for command to execute.
151
        :param expected_return_codes: Return codes to be considered acceptable
152
        :return: Command output for user to verify it.
153
        """
154
        command = f"{self._tool_exec} {command}"
1✔
155
        output = self._connection.execute_command(
1✔
156
            command, timeout=timeout, expected_return_codes=expected_return_codes
157
        ).stdout
158
        return output
1✔
159

160
    def get_version(self) -> Optional[str]:
1✔
161
        """
162
        Get version of tool.
163

164
        :return: Version of tool
165
        """
UNCOV
166
        logger.log(level=log_levels.MODULE_DEBUG, msg=f"Tool version is not available for {self.tool_executable_name}")
×
UNCOV
167
        return "N/A"
×
168

169
    def get_switch_stats(self, switch_id: int = 1) -> SwitchStats:
1✔
170
        """
171
        Get command line interface client switch stats.
172

173
        :param switch_id: switch ID
174
        :return: Stats for both directions
175
        """
176
        command = f"--query --statistics --switch {switch_id}"
1✔
177
        # w/a because the first execution of this command never shows refreshed stats.
178
        self.execute_cli_client_command(command=command)
1✔
179
        output = self.execute_cli_client_command(command=command)
1✔
180
        rx_stats = FlowStats([0], 0, 0)
1✔
181
        tx_stats = FlowStats([0], 0, 0)
1✔
182
        unicast_counter = multicast_counter = broadcast_counter = 0
1✔
183

184
        for direction_in_output, direction_in_stats in zip(["egress", "ingress"], ["tx", "rx"]):
1✔
185
            traffic_classes_packet_counter = [0] * self.ALL_USER_PRIORITY_TRAFFIC_CLASS
1✔
186
            packet_counter = discards_counter = 0
1✔
187
            for traffic_class in range(self.ALL_USER_PRIORITY_TRAFFIC_CLASS):
1✔
188
                tc_counter_regex = rf"{direction_in_output}\stc\s{traffic_class}\spacket\scounter:\s(?P<counter>\d+)"
1✔
189
                match = re.search(tc_counter_regex, output)
1✔
190
                if match:
1✔
191
                    traffic_classes_packet_counter[traffic_class] = int(match.group("counter"))
1✔
192

193
            packet_counter_regex = rf"{direction_in_output}\spacket:\s(?P<counter>\d+)\sbytes"
1✔
194
            match = re.search(packet_counter_regex, output)
1✔
195
            if match:
1✔
196
                packet_counter = int(match.group("counter"))
1✔
197

198
            discards_counter_regex = rf"{direction_in_output}\sdiscards\spacket:\s(?P<counter>\d+)\sbytes"
1✔
199
            match = re.search(discards_counter_regex, output)
1✔
200
            if match:
1✔
201
                discards_counter = int(match.group("counter"))
1✔
202

203
            if "tx" == direction_in_stats:
1✔
204
                tx_stats = FlowStats(traffic_classes_packet_counter, packet_counter, discards_counter)
1✔
205
            else:
206
                rx_stats = FlowStats(traffic_classes_packet_counter, packet_counter, discards_counter)
1✔
207

208
        unicast_counter_regex = r"unicast\spacket:\s(?P<counter>\d+)\sbytes"
1✔
209
        match = re.search(unicast_counter_regex, output)
1✔
210
        if match:
1✔
211
            unicast_counter = int(match.group("counter"))
1✔
212

213
        multicast_counter_regex = r"multicast\spacket:\s(?P<counter>\d+)\sbytes"
1✔
214
        match = re.search(multicast_counter_regex, output)
1✔
215
        if match:
1✔
216
            multicast_counter = int(match.group("counter"))
1✔
217

218
        broadcast_counter_regex = r"broadcast\spacket:\s(?P<counter>\d+)\sbytes"
1✔
219
        match = re.search(broadcast_counter_regex, output)
1✔
220
        if match:
1✔
221
            broadcast_counter = int(match.group("counter"))
1✔
222

223
        return SwitchStats(tx_stats, rx_stats, unicast_counter, multicast_counter, broadcast_counter)
1✔
224

225
    def get_vsi_statistics(self, vsi_id: int = 1) -> VSIStats:
1✔
226
        """
227
        Get command line interface client vsi stats.
228

229
        :param vsi_id: VSI ID
230
        :return: Stats for both directions
231
        """
232
        command = f"--query --statistics --vsi {vsi_id}"
1✔
233
        # w/a because the first execution of this command never shows refreshed stats.
234
        self.execute_cli_client_command(command=command)
1✔
235
        output = self.execute_cli_client_command(command=command)
1✔
236
        rx_stats = VSIFlowStats(0, 0, 0, 0, 0, 0, 0)
1✔
237
        tx_stats = VSIFlowStats(0, 0, 0, 0, 0, 0, 0)
1✔
238
        stats = {}
1✔
239

240
        for direction in ["ingress", "egress"]:
1✔
241
            patterns = {
1✔
242
                "packet": rf"{direction} packet: (?P<counter>\d+)",
243
                "unicast_packet": rf"{direction} unicast packet: (?P<counter>\d+)",
244
                "multicast_packet": rf"{direction} multicast packet: (?P<counter>\d+)",
245
                "broadcast_packet": rf"{direction} broadcast packet: (?P<counter>\d+)",
246
                "discards_packet": rf"{direction} discards packet: (?P<counter>\d+)",
247
                "errors_packet": rf"{direction} errors packet: (?P<counter>\d+)",
248
                "unknown_packet": rf"{direction} unknown packet: (?P<counter>\d+)",
249
            }
250
            for key, regex in patterns.items():
1✔
251
                match = re.search(regex, output)
1✔
252
                stats[key] = int(match.group("counter")) if match else None
1✔
253

254
            if "ingress" == direction:
1✔
255
                rx_stats = VSIFlowStats(**stats)
1✔
256
            else:
257
                tx_stats = VSIFlowStats(**stats)
1✔
258

259
        return VSIStats(rx_stats, tx_stats)
1✔
260

261
    def add_group_vf2vm(self, psm_vf2vm: Dict[int, List[int]]) -> None:
1✔
262
        """Create a full vf2vm topology in PSM from a dictionary.
263

264
        :param psm_vf2vm: Dictionary of VMs to create and list of Vfs to assign to VMs.
265
        :type psm_vf2vm: Dict[int, List[int]]
266
        :raises CliClientException: on failure
267
        """
268
        for vmid in psm_vf2vm.keys():
1✔
269
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Creating PSM VM node {vmid}")
1✔
270
            self.add_psm_vm_node(vm_id=vmid)
1✔
271

272
        for vmid, vfs in psm_vf2vm.items():
1✔
273
            for vf in vfs:
1✔
274
                logger.log(level=log_levels.MODULE_DEBUG, msg=f"Mapping VF: {vf} to VM node: {vmid}")
1✔
275
                self.add_vf_to_vm_node(vm_id=vmid, vf_id=vf)
1✔
276

277
    def add_psm_vm_node(self, vm_id: Union[int, str] = 1) -> None:
1✔
278
        """
279
        Add a VM node in the LAN PSM/Work Scheduler tree.
280

281
        :param vm_id: VM node id/index. If hex string, then hex string is sent to cli_client.
282
        :raises CliClientException: on failure
283
        """
284
        if isinstance(vm_id, str):
1✔
285
            try:
1✔
286
                int(vm_id, 16)
1✔
UNCOV
287
            except ValueError:
×
UNCOV
288
                raise CliClientException("Cannot parse int from hex string")
×
289

290
        output = self.execute_cli_client_command(command=f"-b psm -m -c -H 0 --vmid {vm_id}")
1✔
291
        if "command succeeded" in output.lower():
1✔
292
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Successfully add PSM VM node id: {vm_id}.")
1✔
293
        else:
UNCOV
294
            raise CliClientException(f"Error adding PSM VM node id: {vm_id}")
×
295

296
    def add_vf_to_vm_node(self, vf_id: Union[int, str] = 0, vm_id: Union[int, str] = 1) -> None:
1✔
297
        """
298
        Add a VF to a VM node in the LAN PSM/Work Scheduler tree.
299

300
        :param vf_id: VF node id/index. If hex string, hex is sent to cli_client command.
301
        :raises CliClientException: on failure
302
        """
303
        if isinstance(vm_id, str):
1✔
304
            try:
1✔
305
                int(vm_id, 16)
1✔
UNCOV
306
            except ValueError:
×
307
                raise CliClientException("Cannot parse int from hex string")
×
308
        if isinstance(vf_id, str):
1✔
309
            try:
1✔
310
                int(vf_id, 16)
1✔
UNCOV
311
            except ValueError:
×
UNCOV
312
                raise CliClientException("Cannot parse int from hex string")
×
313

314
        output = self.execute_cli_client_command(command=f"-b psm -m -c -H 0 --vfid {vf_id} --vmid {vm_id}")
1✔
315
        if "command succeeded" in output.lower():
1✔
316
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Successfully add VF {vf_id} to VM node id {vm_id}.")
1✔
317
        else:
UNCOV
318
            raise CliClientException(f"Error adding VF {vf_id} to VM node id {vm_id}")
×
319

320
    def prepare_vm_vsi(self, vf_amount: Union[int, str] = 1) -> None:
1✔
321
        """
322
        Pick a VM ID for each VM and associate it to the host.
323

324
        :param vf_amount: Number of VFs. If hex string, hex is used in cli_client command.
325
        """
326
        use_hex = True
1✔
327
        if isinstance(vf_amount, str):
1✔
328
            try:
1✔
329
                vf_id_list = range(int(vf_amount, 16))
1✔
UNCOV
330
            except ValueError:
×
UNCOV
331
                raise CliClientException("Cannot parse int from hex string")
×
332
        else:
333
            vf_id_list = range(vf_amount)
1✔
334
            use_hex = False
1✔
335

336
        # Start vm nodes at 1
337
        for node in vf_id_list:
1✔
338
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Mapping vf_id: {node} to vm node: {node+1}")
1✔
339
            if use_hex:
1✔
340
                self.add_psm_vm_node(vm_id=hex(node + 1))
1✔
341
                self.add_vf_to_vm_node(vf_id=hex(node), vm_id=hex(node + 1))
1✔
342
            else:
343
                self.add_psm_vm_node(vm_id=node + 1)
1✔
344
                self.add_vf_to_vm_node(vf_id=node, vm_id=node + 1)
1✔
345

346
    def find_vf_vsi(self, vf_amount: int = 1) -> Dict[str, str]:
1✔
347
        """
348
        Find VSI per VF.
349

350
        :param vf_amount: Number of VFs
351
        :return: dict with vf vsi
352
        """
353
        vf_found = 0
1✔
354
        logger.log(level=log_levels.MODULE_DEBUG, msg="Find VFs VSIs.")
1✔
355
        output = self.get_vsi_config_list()
1✔
356
        vf_vsi = {}
1✔
357

358
        for vsi in output:
1✔
359
            if vf_found != vf_amount:
1✔
360
                if vsi.is_vf is False:
1✔
361
                    continue
1✔
362
                else:
363
                    vf_vsi[f"{vsi.fn_id:x}"] = f"{vsi.vsi_id:x}"
1✔
364
                    vf_found = vf_found + 1
1✔
365
            else:
366
                break
1✔
367

368
        return vf_vsi
1✔
369

370
    def get_mac_and_vsi_list(self) -> List[VsiListEntry]:
1✔
371
        """
372
        Get MAC and VSI list.
373

374
        :return: list with entries from VSI list containing VSI ID and MAC address
375
        """
376
        output = self.get_vsi_config_list()
1✔
377
        vsi_mac_list = []
1✔
378

379
        for vsi in output:
1✔
380
            vsi_mac_list.append(VsiListEntry(vsi.vsi_id, vsi.mac))
1✔
381

382
        return vsi_mac_list
1✔
383

384
    def get_vsi_config_list(self) -> List[VsiConfigListEntry]:
1✔
385
        """
386
        Get MAC and VSI list.
387

388
        :return: list with entries from VSI list containing all fields in ouput
389
        """
390
        output = self.execute_cli_client_command(command="--query --config --verbose")
1✔
391
        pattern = re.compile(
1✔
392
            r"fn_id:\s(?P<fn_id>\w+).*host_id:\s(?P<host_id>\w+).*is_vf:\s(?P<is_vf>(no|yes)).*vsi_id:\s(?P"
393
            r"<vsi_id>\w+).*vport_id\s(?P<vport_id>\w+).*is_created:\s(?P<is_created>(no|yes)).*is_enabled:"
394
            r"\s(?P<is_enabled>(no|yes))\smac\saddr:\s(?P<mac>([a-fA-F0-9]{1,2}[:|-]?){6})"
395
        )
396
        vsi_config_list = []
1✔
397

398
        for line in [match.groupdict() for match in pattern.finditer(output)]:
1✔
399
            fn_id = int(line["fn_id"], 16)
1✔
400
            host_id = int(line["host_id"], 16)
1✔
401
            is_vf = True if line["is_vf"] == "yes" else False
1✔
402
            vsi_id = int(line["vsi_id"], 16)
1✔
403
            vport_id = int(line["vport_id"], 16)
1✔
404
            is_created = True if line["is_created"] == "yes" else False
1✔
405
            is_enabled = True if line["is_enabled"] == "yes" else False
1✔
406
            mac = MACAddress(line["mac"])
1✔
407
            vsi_config_list.append(
1✔
408
                VsiConfigListEntry(fn_id, host_id, is_vf, vsi_id, vport_id, is_created, is_enabled, mac)
409
            )
410

411
        return vsi_config_list
1✔
412

413
    def get_tc_priorities_switch(self, switch_id: int = 1) -> TrafficClassCounters:
1✔
414
        """
415
        Get Traffic Class priorities from switch stats.
416

417
        :param switch_id: switch ID
418
        :return: Stats of Traffic Classes counter
419
        """
420
        switch_stats = self.get_switch_stats(switch_id)
1✔
421
        return TrafficClassCounters(
1✔
422
            rx=switch_stats.ingress.traffic_class_counters, tx=switch_stats.egress.traffic_class_counters
423
        )
424

425
    def apply_up_tc_changes(self, config_file_path: Union[Path, str]) -> None:
1✔
426
        """
427
        Apply the User Priorities and Traffic Classes configuration changes from file.
428

429
        :param config_file_path: Path to user priority/traffic classes file
430
        :raises CliClientException: on failure
431
        """
432
        self._apply_config_changes("TC", "file successfully processed", config_file_path)
1✔
433

434
    def apply_tuprl_changes(self, config_file_path: Union[Path, str]) -> None:
1✔
435
        """
436
        Apply the TUPRL configuration changes from file.
437

438
        :param config_file_path: Path to user qos_tuprl.cfg file
439
        :raises CliClientException: on failure
440
        """
441
        self._apply_config_changes("TUPRL", "command succeeded", config_file_path)
1✔
442

443
    def apply_mrl_changes(self, config_file_path: Union[Path, str]) -> None:
1✔
444
        """
445
        Apply the MRL (Mirror Rate Limit) configuration changes from file.
446

447
        :param config_file_path: Path to user qos_mirr_rl.cfg file
448
        :raises CliClientException: on failure
449
        """
450
        self._apply_config_changes("MRL", "command succeeded", config_file_path)
1✔
451

452
    def apply_fxprl_changes(self, config_file_path: Union[Path, str]) -> None:
1✔
453
        """
454
        Apply the FXP_RL configuration changes from file.
455

456
        :param config_file_path: Path to user qos_mirr_rl.cfg file
457
        :raises CliClientException: on failure
458
        """
459
        self._apply_config_changes("FXP_RL", "command succeeded", config_file_path)
1✔
460

461
    def apply_vmrl_changes(self, config_file_path: Union[Path, str]) -> None:
1✔
462
        """
463
        Apply the VMRL (VM Rate Limiter) configuration changes from file.
464

465
        :param config_file_path: Path to user qos_vmrl.cfg file
466
        :raises CliClientException: on failure
467
        """
468
        self._apply_config_changes("VMRL", "command succeeded", config_file_path)
1✔
469

470
    def apply_grl_changes(self, config_file_path: Union[Path, str]) -> None:
1✔
471
        """
472
        Apply the GRL (Global Rate Limiter) configuration changes from file.
473

474
        :param config_file_path: Path to user qos_global_rl.cfg file
475
        :raises CliClientException: on failure
476
        """
477
        self._apply_config_changes("GRL", "command succeeded", config_file_path)
1✔
478

479
    def configure_up_up_translation(self, vsi_id: int = 0, different_value: bool = False) -> None:
1✔
480
        """
481
        Configure UP-UP translation from the CLI tool such that each NUP value maps to same VUP value.
482

483
        :param vsi_id: vsi id of interface where mapping will be applied
484
        :param different_value: each NUP value maps to a different VUP value
485
        :raises CliClientException: on failure
486
        """
487
        command_list = []
1✔
488
        for direction in [0, 1]:  # 0 - rx, 1 - tx
1✔
489
            list_of_traffic_classes = range(self.ALL_USER_PRIORITY_TRAFFIC_CLASS)
1✔
490
            if different_value:
1✔
UNCOV
491
                for value, rev_val in zip(list_of_traffic_classes, reversed(list_of_traffic_classes)):
×
492
                    cmd = f"-b qos -m -v {vsi_id} --dir {direction} "
×
493
                    if direction == 0:
×
UNCOV
494
                        cmd += f"--nup {value} --vup {rev_val}"
×
495
                    else:
UNCOV
496
                        cmd += f"--nup {rev_val} --vup {value}"
×
UNCOV
497
                    command_list.append(cmd)
×
498
            else:
499
                for value in list_of_traffic_classes:
1✔
500
                    cmd = f"-b qos -m -v {vsi_id} --dir {direction} --nup {value} --vup {value}"
1✔
501
                    command_list.append(cmd)
1✔
502
        for command in command_list:
1✔
503
            output = self.execute_cli_client_command(command=command)
1✔
504
            if "command succeeded" in output.lower():
1✔
505
                logger.log(level=log_levels.MODULE_DEBUG, msg=f"Configure UP-UP translation ({command}) passed.")
1✔
506
            else:
UNCOV
507
                raise CliClientException(f"Configure UP-UP translation ({command}) failed.")
×
508

509
    def send_link_change_event_all_pf(self, link_status: str, link_speed: str = "200000Mbps") -> None:
1✔
510
        """
511
        Send a link change event to all pfs.
512

513
        :param link_status: Link status ('up' or 'down')
514
        :param link_speed: Link speed (one of 100MB,1GB,10GB,40GB,20GB,25GB,2_5GB,5GB,xxxMbps(xxx from 1 to 200000))
515
        :raises CliClientException: on failure
516
        """
517
        try:
1✔
518
            cmd = (
1✔
519
                "--event link_change "
520
                f"--link_status {LinkStatus[link_status.upper()]} --link_speed {link_speed} --all_pf"
521
            )
UNCOV
522
        except KeyError as illegal_link:
×
UNCOV
523
            raise CliClientException("Link status must be 'up' or 'down'") from illegal_link
×
524
        output = self.execute_cli_client_command(command=cmd)
1✔
525
        if "command succeeded" in output.lower():
1✔
526
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Link change ({cmd}) passed.")
1✔
527
        else:
UNCOV
528
            raise CliClientException(f"Link change ({cmd}) failed.")
×
529

530
    def send_link_change_event_per_pf(
1✔
531
        self,
532
        link_status: str,
533
        link_speed: str = "200000Mbps",
534
        pf_num: int = 0,
535
        vport_id: Optional[int] = None,
536
    ) -> None:
537
        """
538
        Send a link change event with link status and link speed to specified pf and vport.
539

540
        :param link_status: Link status ('down' or 'up')
541
        :param link_speed: Link speed (one of 100MB,1GB,10GB,40GB,20GB,25GB,2_5GB,5GB,xxxMbps(xxx from 1 to 200000))
542
        :param pf_num: pf number on which link changes will be applied.
543
        :param vport_id: vport id on which link changes will be applied. (optional)
544
        :raises CliClientException: on failure
545
        """
546
        try:
1✔
547
            cmd = f"--event link_change --link_status {LinkStatus[link_status.upper()]} --link_speed {link_speed} "
1✔
UNCOV
548
        except KeyError as illegal_link:
×
UNCOV
549
            raise CliClientException("Link status must be 'up' or 'down'") from illegal_link
×
550
        cmd += f"--pf_num {hex(pf_num)} "
1✔
551
        cmd += f"--vport_id {hex(vport_id)}" if vport_id else ""
1✔
552
        output = self.execute_cli_client_command(command=cmd)
1✔
553
        if "command succeeded" in output.lower():
1✔
554
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Link change ({cmd}) passed.")
1✔
555
        else:
UNCOV
556
            raise CliClientException(f"Link change ({cmd}) failed.")
×
557

558
    def create_mirror_profile(self, profile_id: int, vsi_id: int) -> None:
1✔
559
        """
560
        Create mirror profile to mirror traffic to a specific vsi.
561

562
        :param profile_id: Mirror profile id (must be >= 16)
563
        :param vsi_id: VSI id where packets will be mirrored
564
        """
565
        if profile_id < 16:
1✔
UNCOV
566
            raise CliClientException(f"Mirror profile id {profile_id} must be >=16. Profiles < 16 are reserved.")
×
567

568
        cmd = f"--modify --config --mir_prof {profile_id} --vsi {vsi_id} --func_valid"
1✔
569
        output = self.execute_cli_client_command(command=cmd)
1✔
570
        output_lower = output.lower()
1✔
571
        if any(marker in output_lower for marker in self._MIRROR_PROFILE_SUCCESS_MARKERS):
1✔
572
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Mirror profile ({cmd}) passed.")
1✔
573
        else:
574
            raise CliClientException(f"Mirror profile ({cmd}) failed.")
1✔
575

576
    def delete_mirror_profile(self, profile_id: int, vsi_id: int) -> None:
1✔
577
        """
578
        Disable mirror profile by clearing func_valid for a specific profile/vsi mapping.
579

580
        :param profile_id: Mirror profile id (must be >= 16)
581
        :param vsi_id: VSI id currently configured on the mirror profile
582
        """
583
        if profile_id < 16:
1✔
NEW
584
            raise CliClientException(f"Mirror profile id {profile_id} must be >=16. Profiles < 16 are reserved.")
×
585

586
        cmd = f"--modify --config --mir_prof {profile_id} --vsi {vsi_id}"
1✔
587
        output = self.execute_cli_client_command(command=cmd)
1✔
588
        output_lower = output.lower()
1✔
589
        if any(marker in output_lower for marker in self._MIRROR_PROFILE_SUCCESS_MARKERS):
1✔
590
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Mirror profile delete ({cmd}) passed.")
1✔
591
        else:
592
            raise CliClientException(f"Mirror profile delete ({cmd}) failed.")
1✔
593

594
    def add_psm_vm_rl(self, vm_id: Union[int, str] = 1, limit: int = 10000, burst: int = 2048) -> None:
1✔
595
        """
596
        Add a VM rate limit in the LAN PSM/Work Scheduler tree.
597

598
        :param vm_id: VM node id/index. If hex string, then hex string is sent to cli_client.
599
        :param limit: Rate limit amount.
600
        :param burst: Burst amount.
601
        :raises CliClientException: on failure
602
        """
603
        if isinstance(vm_id, str):
1✔
UNCOV
604
            try:
×
UNCOV
605
                int(vm_id, 16)
×
UNCOV
606
            except ValueError:
×
UNCOV
607
                raise CliClientException("Cannot parse int from hex string")
×
608

609
        output = self.execute_cli_client_command(command=f"-b psm -m -c -H 0 --vmid {vm_id} -l {limit} -u {burst}")
1✔
610
        if "command succeeded" in output.lower():
1✔
611
            logger.log(level=log_levels.MODULE_DEBUG, msg=f"Successfully added {limit} rate limit on vmid: {vm_id}.")
1✔
612
        else:
UNCOV
613
            raise CliClientException(f"Error adding PSM VM ratelimit on vmid: {vm_id} rate: {limit} burst: {burst}")
×
614

615
    def read_qos_vm_info(self) -> Dict[int, Dict[int, List[int]]]:
1✔
616
        """
617
        Query, parse and return the VF2VM mapping currently applied in the cp.
618

619
        return: A dictionary of keys hosts, if a host has vms the key is a dict
620
                of vms which keys are vfs in that vm.
621

622
                For example if Host 0 has 2 VMs, each having 2 vfs and there
623
                is 1 baremetal vf it would return:
624

625
                {0: {1: [0, 1], 2: [2, 3], -1: [4]}, 1: {}, 2: {}, 3: {}}
626
        raises: CliClientException on failure
627
        """
628
        output = self.execute_cli_client_command(command="--query --statistics --vm_qos_info")
1✔
629
        if "server finished responding" not in output.lower():
1✔
UNCOV
630
            raise CliClientException("cli_client returned unexpected output when querying vm_qos_info")
×
631

632
        lines = output.split("\n")
1✔
633
        data = {}
1✔
634
        host_id = None
1✔
635
        vm_id = None
1✔
636

637
        for line in lines:
1✔
638
            if "HOST ID" in line:
1✔
639
                host_id = int(line.split()[-1])
1✔
640
                data[host_id] = {}
1✔
641
            elif "VM ID" in line:
1✔
642
                vm_id = int(line.split()[-1])
1✔
643
                data[host_id][vm_id] = []
1✔
644
            elif "VF ID" in line:
1✔
645
                vf_ids = line.split(":")[-1].strip().split(",")
1✔
646
                vf_ids = [int(vfid) for vfid in vf_ids if vfid]
1✔
647
                data[host_id][vm_id] = vf_ids
1✔
648

649
        for key in [0, 1, 2, 3]:
1✔
650
            if key not in data:
1✔
UNCOV
651
                raise CliClientException("Error parsing output from vm_qos_info")
×
652

653
        return data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc