• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

newAM / monitorcontrol / 15258943961

26 May 2025 05:04PM UTC coverage: 57.508%. Remained the same
15258943961

Pull #387

github

web-flow
Merge 72f9cb0e0 into 1a04d0367
Pull Request #387: ruff: add flake8-logging-format

3 of 11 new or added lines in 3 files covered. (27.27%)

2 existing lines in 1 file now uncovered.

383 of 666 relevant lines covered (57.51%)

2.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.2
/monitorcontrol/vcp/vcp_linux.py
1
from .vcp_abc import VCP, VCPIOError, VCPPermissionError
5✔
2
from types import TracebackType
5✔
3
from typing import List, Optional, Tuple, Type
5✔
4
import os
5✔
5
import struct
5✔
6
import sys
5✔
7
import time
5✔
8
import logging
5✔
9

10
# hide the Linux code from Windows CI coverage
11
if sys.platform.startswith("linux"):
5✔
12
    import fcntl
5✔
13
    import pyudev
5✔
14

15

16
class LinuxVCP(VCP):
5✔
17
    """
18
    Linux API access to a monitor's virtual control panel.
19

20
    References:
21
        https://github.com/Informatic/python-ddcci
22
        https://github.com/siemer/ddcci/
23
    """
24

25
    GET_VCP_HEADER_LENGTH = 2  # header packet length
5✔
26
    PROTOCOL_FLAG = 0x80  # protocol flag is bit 7 of the length byte
5✔
27

28
    # VCP commands
29
    GET_VCP_CMD = 0x01  # get VCP feature command
5✔
30
    GET_VCP_REPLY = 0x02  # get VCP feature reply code
5✔
31
    SET_VCP_CMD = 0x03  # set VCP feature command
5✔
32
    GET_VCP_CAPS_CMD = 0xF3  # Capabilities Request command
5✔
33
    GET_VCP_CAPS_REPLY = 0xE3  # Capabilities Request reply
5✔
34

35
    # timeouts
36
    GET_VCP_TIMEOUT = 0.04  # at least 40ms per the DDCCI specification
5✔
37
    CMD_RATE = 0.05  # at least 50ms between messages
5✔
38

39
    # addresses
40
    DDCCI_ADDR = 0x37  # DDC-CI command address on the I2C bus
5✔
41
    HOST_ADDRESS = 0x51  # virtual I2C slave address of the host
5✔
42
    I2C_SLAVE = 0x0703  # I2C bus slave address
5✔
43

44
    GET_VCP_RESULT_CODES = {
5✔
45
        0: "No Error",
46
        1: "Unsupported VCP code",
47
    }
48

49
    CHECKSUM_ERRORS: str = "ignore"
5✔
50

51
    def __init__(self, bus_number: int):
5✔
52
        """
53
        Args:
54
            bus_number: I2C bus number.
55
        """
56
        self.logger = logging.getLogger(__name__)
×
57
        self.bus_number = bus_number
×
58
        self.fd: Optional[str] = None
×
59
        self.fp: str = f"/dev/i2c-{self.bus_number}"
×
60
        # time of last feature set call
61
        self.last_set: Optional[float] = None
×
62

63
    def __enter__(self):
5✔
64
        def cleanup(fd: Optional[int]):
×
65
            if fd is not None:
×
66
                try:
×
67
                    os.close(self.fd)
×
68
                except OSError:
×
69
                    pass
×
70

71
        try:
×
72
            self.fd = os.open(self.fp, os.O_RDWR)
×
73
            fcntl.ioctl(self.fd, self.I2C_SLAVE, self.DDCCI_ADDR)
×
74
            self.read_bytes(1)
×
75
        except PermissionError as e:
×
76
            cleanup(self.fd)
×
77
            raise VCPPermissionError(f"permission error for {self.fp}") from e
×
78
        except OSError as e:
×
79
            cleanup(self.fd)
×
80
            raise VCPIOError(f"unable to open VCP at {self.fp}") from e
×
81
        except Exception as e:
×
82
            cleanup(self.fd)
×
83
            raise e
×
84
        return self
×
85

86
    def __exit__(
5✔
87
        self,
88
        exception_type: Optional[Type[BaseException]],
89
        exception_value: Optional[BaseException],
90
        exception_traceback: Optional[TracebackType],
91
    ) -> Optional[bool]:
92
        try:
×
93
            os.close(self.fd)
×
94
        except OSError as e:
×
95
            raise VCPIOError("unable to close descriptor") from e
×
96
        self.fd = None
×
97

98
        return False
×
99

100
    def set_vcp_feature(self, code: int, value: int):
5✔
101
        """
102
        Sets the value of a feature on the virtual control panel.
103

104
        Args:
105
            code: feature code
106
            value: feature value
107

108
        Raises:
109
            VCPIOError: failed to set VCP feature
110
        """
111
        self.rate_limt()
×
112

113
        # transmission data
114
        data = bytearray()
×
115
        data.append(self.SET_VCP_CMD)
×
116
        data.append(code)
×
117
        low_byte, high_byte = struct.pack("H", value)
×
118
        data.append(high_byte)
×
119
        data.append(low_byte)
×
120

121
        # add headers and footers
122
        data.insert(0, (len(data) | self.PROTOCOL_FLAG))
×
123
        data.insert(0, self.HOST_ADDRESS)
×
124
        data.append(self.get_checksum(bytearray([self.DDCCI_ADDR << 1]) + data))
×
125

126
        # write data
NEW
127
        self.logger.debug(
×
128
            "data=",
129
            extra=dict(data=" ".join([f"{x:02X}" for x in data])),
130
        )
UNCOV
131
        self.write_bytes(data)
×
132

133
        # store time of last set VCP
134
        self.last_set = time.time()
×
135

136
    def get_vcp_feature(self, code: int) -> Tuple[int, int]:
5✔
137
        """
138
        Gets the value of a feature from the virtual control panel.
139

140
        Args:
141
            code: Feature code.
142

143
        Returns:
144
            Current feature value, maximum feature value.
145

146
        Raises:
147
            VCPIOError: Failed to get VCP feature.
148
        """
149
        self.rate_limt()
×
150

151
        # transmission data
152
        data = bytearray()
×
153
        data.append(self.GET_VCP_CMD)
×
154
        data.append(code)
×
155

156
        # add headers and footers
157
        data.insert(0, (len(data) | self.PROTOCOL_FLAG))
×
158
        data.insert(0, self.HOST_ADDRESS)
×
159
        data.append(self.get_checksum(bytearray([self.DDCCI_ADDR << 1]) + data))
×
160

161
        # write data
NEW
162
        self.logger.debug(
×
163
            "data=",
164
            extra=dict(data=" ".join([f"{x:02X}" for x in data])),
165
        )
UNCOV
166
        self.write_bytes(data)
×
167

168
        time.sleep(self.GET_VCP_TIMEOUT)
×
169

170
        # read the data
171
        header = self.read_bytes(self.GET_VCP_HEADER_LENGTH)
×
NEW
172
        self.logger.debug(
×
173
            "header={header}",
174
            extra=dict(header=" ".join([f"{x:02X}" for x in header])),
175
        )
176
        source, length = struct.unpack("=BB", header)
×
177
        length &= ~self.PROTOCOL_FLAG  # clear protocol flag
×
178
        payload = self.read_bytes(length + 1)
×
NEW
179
        self.logger.debug(
×
180
            "payload={payload}",
181
            extra=dict(payload=" ".join([f"{x:02X}" for x in payload])),
182
        )
183

184
        # check checksum
185
        payload, checksum = struct.unpack(f"={length}sB", payload)
×
186
        calculated_checksum = self.get_checksum(header + payload)
×
187
        checksum_xor = checksum ^ calculated_checksum
×
188
        if checksum_xor:
×
189
            message = f"checksum does not match: {checksum_xor}"
×
190
            if self.CHECKSUM_ERRORS.lower() == "strict":
×
191
                raise VCPIOError(message)
×
192
            elif self.CHECKSUM_ERRORS.lower() == "warning":
×
193
                self.logger.warning(message)
×
194
            # else ignore
195

196
        # unpack the payload
197
        (
×
198
            reply_code,
199
            result_code,
200
            vcp_opcode,
201
            vcp_type_code,
202
            feature_max,
203
            feature_current,
204
        ) = struct.unpack(">BBBBHH", payload)
205

206
        if reply_code != self.GET_VCP_REPLY:
×
207
            raise VCPIOError(f"received unexpected response code: {reply_code}")
×
208

209
        if vcp_opcode != code:
×
210
            raise VCPIOError(f"received unexpected opcode: {vcp_opcode}")
×
211

212
        if result_code > 0:
×
213
            try:
×
214
                message = self.GET_VCP_RESULT_CODES[result_code]
×
215
            except KeyError:
×
216
                message = f"received result with unknown code: {result_code}"
×
217
            raise VCPIOError(message)
×
218

219
        return feature_current, feature_max
×
220

221
    def get_vcp_capabilities(self) -> str:
5✔
222
        """
223
        Gets capabilities string from the virtual control panel.
224

225
        Returns:
226
            One long capabilities string in the format:
227
            "(prot(monitor)type(LCD)model(ACER VG271U)cmds(01 02 03 07 0C)"
228

229
            No error checking for the string being valid. String can have
230
            bit errors or dropped characters.
231

232
        Raises:
233
            VCPError: Failed to get VCP feature.
234
        """
235

236
        # Create an empty capabilities string to be filled with the data
237
        caps_str = ""
×
238

239
        self.rate_limt()
×
240

241
        # Get the first 32B of capabilities string
242
        offset = 0
×
243

244
        # Keep a count going to keep things sane
245
        loop_count = 0
×
246
        loop_count_limit = 40
×
247

248
        while loop_count < loop_count_limit:
×
249
            loop_count += 1
×
250

251
            # transmission data
252
            data = bytearray()
×
253
            data.append(self.GET_VCP_CAPS_CMD)
×
254
            low_byte, high_byte = struct.pack("H", offset)
×
255
            data.append(high_byte)
×
256
            data.append(low_byte)
×
257

258
            # add headers and footers
259
            data.insert(0, (len(data) | self.PROTOCOL_FLAG))
×
260
            data.insert(0, self.HOST_ADDRESS)
×
261
            data.append(self.get_checksum(data))
×
262

263
            # write data
264
            self.write_bytes(data)
×
265

266
            time.sleep(self.GET_VCP_TIMEOUT)
×
267

268
            # read the data
269
            header = self.read_bytes(self.GET_VCP_HEADER_LENGTH)
×
NEW
270
            self.logger.debug(
×
271
                "header={header}",
272
                extra=dict(header=" ".join([f"{x:02X}" for x in header])),
273
            )
274
            source, length = struct.unpack("BB", header)
×
275
            length &= ~self.PROTOCOL_FLAG  # clear protocol flag
×
276
            payload = self.read_bytes(length + 1)
×
NEW
277
            self.logger.debug(
×
278
                "payload={payload}",
279
                extra=dict(payload=" ".join([f"{x:02X}" for x in payload])),
280
            )
281

282
            # check if length is valid
283
            if length < 3 or length > 35:
×
284
                raise VCPIOError(f"received unexpected response length: {length}")
×
285

286
            # check checksum
287
            payload, checksum = struct.unpack(f"{length}sB", payload)
×
288
            calculated_checksum = self.get_checksum(header + payload)
×
289
            checksum_xor = checksum ^ calculated_checksum
×
290
            if checksum_xor:
×
291
                message = f"checksum does not match: {checksum_xor}"
×
292
                if self.CHECKSUM_ERRORS.lower() == "strict":
×
293
                    raise VCPIOError(message)
×
294
                elif self.CHECKSUM_ERRORS.lower() == "warning":
×
295
                    self.logger.warning(message)
×
296
                # else ignore
297
            # remove checksum from length
298

299
            # unpack the payload
300
            reply_code, payload = struct.unpack(f">B{length - 1}s", payload)
×
301
            length -= 1
×
302

303
            if reply_code != self.GET_VCP_CAPS_REPLY:
×
304
                raise VCPIOError(f"received unexpected response code: {reply_code}")
×
305

306
            # unpack the payload
307
            offset, payload = struct.unpack(f">H{length - 2}s", payload)
×
308
            length -= 2
×
309

310
            if length > 0:
×
311
                caps_str += payload.decode("ASCII")
×
312
            else:
313
                break
×
314

315
            # update the offset and go again
316
            offset += length
×
317

NEW
318
        self.logger.debug("caps str={caps_str}", extra=dict(caps_str=caps_str))
×
319

320
        if loop_count >= loop_count_limit:
×
321
            raise VCPIOError("Capabilities string incomplete or too long")
×
322

323
        return caps_str
×
324

325
    @staticmethod
5✔
326
    def get_checksum(data: bytearray) -> int:
5✔
327
        """
328
        Computes the checksum for a set of data, with the option to
329
        use the virtual host address (per the DDC-CI specification).
330

331
        Args:
332
            data: Data array to transmit.
333

334
        Returns:
335
            Checksum for the data.
336
        """
337
        checksum = 0x00
5✔
338
        for data_byte in data:
5✔
339
            checksum ^= data_byte
5✔
340
        return checksum
5✔
341

342
    def rate_limt(self):
5✔
343
        """Rate limits messages to the VCP."""
344
        if self.last_set is None:
×
345
            return
×
346

347
        rate_delay = self.CMD_RATE - time.time() - self.last_set
×
348
        if rate_delay > 0:
×
349
            time.sleep(rate_delay)
×
350

351
    def read_bytes(self, num_bytes: int) -> bytes:
5✔
352
        """
353
        Reads bytes from the I2C bus.
354

355
        Args:
356
            num_bytes: number of bytes to read
357

358
        Raises:
359
            VCPIOError: unable to read data
360
        """
361
        try:
×
362
            return os.read(self.fd, num_bytes)
×
363
        except OSError as e:
×
364
            raise VCPIOError("unable to read from I2C bus") from e
×
365

366
    def write_bytes(self, data: bytes):
5✔
367
        """
368
        Writes bytes to the I2C bus.
369

370
        Args:
371
            data: data to write to the I2C bus
372

373
        Raises:
374
            VCPIOError: unable to write data
375
        """
376
        try:
×
377
            os.write(self.fd, data)
×
378
        except OSError as e:
×
379
            raise VCPIOError("unable write to I2C bus") from e
×
380

381

382
def get_vcps() -> List[LinuxVCP]:
5✔
383
    """
384
    Interrogates I2C buses to determine if they are DDC-CI capable.
385

386
    Returns:
387
        List of all VCPs detected.
388
    """
389
    vcps = []
5✔
390

391
    # iterate I2C devices
392
    for device in pyudev.Context().list_devices(subsystem="i2c"):
5✔
393
        vcp = LinuxVCP(device.sys_number)
×
394
        try:
×
395
            with vcp:
×
396
                pass
×
397
        except (OSError, VCPIOError):
×
398
            pass
×
399
        else:
400
            vcps.append(vcp)
×
401

402
    return vcps
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc