• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thunze / diskfs / 8087910864

28 Feb 2024 10:24PM UTC coverage: 53.54% (+0.07%) from 53.471%
8087910864

push

github

thunze
Minor cosmetic change in `pyproject.toml`

1898 of 3545 relevant lines covered (53.54%)

4.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.36
/diskfs/disk.py
1
"""Disk access.
2

3
A disk represents either a file or block device that one can access and manipulate.
4
"""
5

6
from __future__ import annotations
9✔
7

8
import logging
9✔
9
import os
9✔
10
import sys
9✔
11
from stat import S_ISBLK, S_ISREG
9✔
12
from types import TracebackType
9✔
13
from typing import TYPE_CHECKING, Any
9✔
14

15
from . import gpt, mbr
9✔
16
from .base import SectorSize, ValidationError
9✔
17
from .table import Table
9✔
18
from .volume import Volume
9✔
19

20
if sys.platform == "win32":
9✔
21
    from .win32 import device_sector_size, device_size, reread_partition_table
3✔
22
elif sys.platform == "linux":
6✔
23
    from .linux import device_sector_size, device_size, reread_partition_table
3✔
24
elif sys.platform == "darwin":
3✔
25
    from .darwin import device_sector_size, device_size, reread_partition_table
3✔
26
else:
27
    raise RuntimeError(f"Unspported platform {sys.platform!r}")
×
28

29
if TYPE_CHECKING:
30
    from .typing_ import ReadableBuffer, StrPath
31

32
__all__ = ["Disk"]
9✔
33

34

35
log = logging.getLogger(__name__)
9✔
36

37

38
if hasattr(os, "pread") and hasattr(os, "pwrite"):
9✔
39
    _read = os.pread
6✔
40
    _write = os.pwrite
6✔
41
else:
42

43
    def _read(fd: int, size: int, pos: int) -> bytes:
3✔
44
        """Read `size` bytes from file descriptor `fd` starting at byte `pos`."""
45
        os.lseek(fd, pos, os.SEEK_SET)
×
46
        return os.read(fd, size)
×
47

48
    def _write(fd: int, b: ReadableBuffer, pos: int) -> int:
3✔
49
        """Write raw bytes `b` to file descriptor `fd` starting at byte `pos`."""
50
        os.lseek(fd, pos, os.SEEK_SET)
×
51
        return os.write(fd, b)
×
52

53

54
class Disk:
9✔
55
    """File or block device that one can access and manipulate.
56

57
    Also serves as an accessor to the underlying file or block device.
58

59
    Do not use `__init__` directly, use `Disk.open()` or `Disk.new()` instead.
60
    """
61

62
    def __init__(
9✔
63
        self,
64
        fd: int,
65
        path: StrPath,
66
        size: int,
67
        sector_size: SectorSize,
68
        *,
69
        device: bool,
70
        writable: bool,
71
    ):
72
        self._fd = fd
×
73
        self._path = str(path)
×
74
        self._size = size
×
75
        self._sector_size = sector_size
×
76
        self._device = device
×
77
        self._writable = writable
×
78

79
        self._closed = False
×
80
        self._table: Table | None = None
×
81

82
        log.info(f"Opened disk {self}")
×
83
        log.info(f"{self} - Size: {size} bytes, {sector_size}")
×
84
        self.read_table()
×
85

86
    @classmethod
9✔
87
    def new(cls, path: StrPath, size: int, *, sector_size: int = 512) -> Disk:
9✔
88
        """Create a new disk image at `path`."""
89
        if size <= 0:
×
90
            raise ValueError("Disk size must be greater than 0")
×
91
        if sector_size <= 0:
×
92
            raise ValueError("Sector size must be greater than 0")
×
93

94
        flags = os.O_CREAT | os.O_EXCL | os.O_RDWR | getattr(os, "O_BINARY", 0)
×
95
        fd = os.open(path, flags, 0o666)
×
96
        try:
×
97
            os.truncate(fd, size)
×
98
            simulated_sector_size = SectorSize(sector_size, sector_size)
×
99
            return cls(
×
100
                fd, path, size, simulated_sector_size, device=False, writable=True
101
            )
102
        except BaseException:
×
103
            os.close(fd)
×
104
            raise
×
105

106
    @classmethod
9✔
107
    def open(
9✔
108
        cls, path: StrPath, *, sector_size: int | None = None, readonly: bool = True
109
    ) -> Disk:
110
        """Open block device or disk image at `path`."""
111
        read_write_flag = os.O_RDONLY if readonly else os.O_RDWR
×
112
        flags = read_write_flag | getattr(os, "O_BINARY", 0)
×
113
        fd = os.open(path, flags)
×
114

115
        try:
×
116
            stat = os.fstat(fd)
×
117
            block_device = S_ISBLK(stat.st_mode)
×
118
            regular_file = S_ISREG(stat.st_mode)
×
119

120
            if block_device:
×
121
                if sector_size is not None:
×
122
                    raise ValueError("Sector size cannot be set for block devices")
×
123

124
                size = device_size(fd)
×
125
                real_sector_size = device_sector_size(fd)
×
126
                return cls(
×
127
                    fd, path, size, real_sector_size, device=True, writable=not readonly
128
                )
129

130
            if regular_file:
×
131
                if sector_size is None:
×
132
                    raise ValueError("Sector size must be set for regular file")
×
133
                if sector_size <= 0:
×
134
                    raise ValueError("Sector size must be greater than 0")
×
135

136
                size = stat.st_size
×
137
                simulated_sector_size = SectorSize(sector_size, sector_size)
×
138
                return cls(
×
139
                    fd,
140
                    path,
141
                    size,
142
                    simulated_sector_size,
143
                    device=False,
144
                    writable=not readonly,
145
                )
146

147
            raise ValueError("File is neither a block device nor a regular file")
×
148

149
        except BaseException:
×
150
            os.close(fd)
×
151
            raise
×
152

153
    def read_at(self, pos: int, size: int) -> bytes:
9✔
154
        """Read `size` sectors from the disk starting at sector `pos`.
155

156
        Uses the logical sector size of the disk.
157
        """
158
        self.check_closed()
×
159

160
        if pos < 0:
×
161
            raise ValueError("Position to read from must be zero or positive")
×
162
        if size < 0:
×
163
            raise ValueError("Amount of sectors to read must be zero or positive")
×
164
        if size == 0:
×
165
            return b""
×
166

167
        pos_bytes = pos * self.sector_size.logical
×
168
        size_bytes = size * self.sector_size.logical
×
169

170
        if pos_bytes + size_bytes > self._size:
×
171
            raise ValueError("Sector range out of disk bounds")
×
172

173
        b = _read(self._fd, size_bytes, pos_bytes)
×
174

175
        if len(b) != size_bytes:
×
176
            raise ValueError(
×
177
                f"Did not read the expected amount of bytes (expected {size} bytes, "
178
                f"got {len(b)} bytes)"
179
            )
180
        return b
×
181

182
    def write_at(
9✔
183
        self, pos: int, b: ReadableBuffer, *, fill_zeroes: bool = False
184
    ) -> None:
185
        """Write raw bytes `b` to the disk starting at sector `pos`.
186

187
        Uses the logical sector size of the disk.
188

189
        :param pos: LBA to write at.
190
        :param b: Bytes to write.
191
        :param fill_zeroes: Whether to fill up the last sector to write at with zeroes
192
            if b doesn't cover the whole sector.
193
        """
194
        self.check_closed()
×
195
        self.check_writable()
×
196

197
        if pos < 0:
×
198
            raise ValueError("Position to write at must be zero or positive")
×
199
        if not isinstance(b, memoryview):
×
200
            b = memoryview(b).cast("B")
×
201
        size = b.nbytes
×
202
        if size == 0:
×
203
            return
×
204

205
        lss = self._sector_size.logical
×
206
        remainder = size % lss
×
207

208
        if remainder != 0:
×
209
            if not fill_zeroes:
×
210
                raise ValueError(
×
211
                    f"Can only write in multiples of {lss} bytes (logical sector size)"
212
                )
213
            zeroes = b"\x00" * (lss - remainder)
×
214
            b = bytes(b) + zeroes
×
215
            size = len(b)
×
216

217
        pos_bytes = pos * self.sector_size.logical
×
218
        if pos_bytes + size > self._size:
×
219
            raise ValueError("Sector range out of disk bounds")
×
220

221
        bytes_written = _write(self._fd, b, pos_bytes)
×
222

223
        if bytes_written != size:
×
224
            raise ValueError(
×
225
                f"Did not write the expected amount of bytes (expected {size} "
226
                f"bytes, wrote {bytes_written} bytes)"
227
            )
228

229
    def flush(self) -> None:
9✔
230
        """Flush write buffers of the underlying file or block device, if applicable."""
231
        self.check_closed()
×
232
        os.fsync(self._fd)
×
233

234
    def read_table(self) -> None:
9✔
235
        """Try to read a partition table on the disk and update the `Disk` object
236
        accordingly.
237

238
        If no partition table can be parsed, the disk is considered unpartitioned.
239
        """
240
        self.check_closed()
×
241
        try:
×
242
            self._table = gpt.Table.from_disk(self)
×
243
        except ValidationError:
×
244
            try:
×
245
                self._table = mbr.Table.from_disk(self)
×
246
            except ValidationError:
×
247
                # no valid partition table found
248
                self._table = None
×
249

250
        if self._table is None:
×
251
            log.info(f"{self} - No valid partition table found")
×
252
        else:
253
            log.info(f"{self} - Found partition table {self._table}")
×
254

255
    def clear(self) -> None:
9✔
256
        """Clear the disk by overwriting specific parts of the disk with zeroes.
257

258
        **Caution:** This will overwrite the disk's partition table and thus remove
259
        access to any partitions residing on the disk. If any file systems reside on
260
        the disk, they will very likely be destroyed as well. Always create a backup
261
        of your data before clearing a disk.
262
        """
263
        self.check_closed()
×
264
        self.check_writable()
×
265
        log.info(f"{self} - Clearing disk")
×
266

267
        self.flush()
×
268
        self._table = None
×
269

270
        if self._device:
×
271
            reread_partition_table(self._fd)
×
272
        raise NotImplementedError
273

274
    def partition(self, table: Table) -> None:
9✔
275
        """Apply a partition table to the disk.
276

277
        **Caution:** If a file system resides on the unpartitioned disk, it will very
278
        likely be overwritten and thus be rendered unusable. Always create a backup
279
        of your data before (re-)partitioning a disk.
280

281
        If the disk is already partitioned, `ValueError` will be raised.
282
        """
283
        self.check_closed()
×
284
        self.check_writable()
×
285

286
        if self._table is not None:
×
287
            raise ValueError(
×
288
                "Disk is already partitioned; clear disk first to re-partition"
289
            )
290
        log.info(f"{self} - Partitioning disk using partition table {table}")
×
291

292
        # noinspection PyProtectedMember
293
        table._write_to_disk(self)
×
294
        self.flush()
×
295
        self._table = table
×
296

297
        if self._device:
×
298
            reread_partition_table(self._fd)
×
299

300
    def volume(self, partition: int | None = None) -> Volume:
9✔
301
        """Get the volume corresponding to partition `partition` on the disk.
302

303
        If `partition` is not specified and the disk is unpartitioned, a volume
304
        spanning the whole disk is returned.
305
        """
306
        self.check_closed()
×
307
        if partition is None:
×
308
            if self._table is not None:
×
309
                raise ValueError(
×
310
                    "Disk is partitioned; please specify a partition number"
311
                )
312
            disk_end = self._size // self.sector_size.logical - 1
×
313
            return Volume(self, 0, disk_end)
×
314

315
        if partition is not None:
×
316
            if self._table is None:
×
317
                raise ValueError(
×
318
                    "Disk is unpartitioned; you cannot specify a partition number"
319
                )
320
            if not 0 <= partition < len(self._table.partitions):
×
321
                raise IndexError("Partition number out of range")
×
322

323
            entry = self._table.partitions[partition]
×
324
            return Volume(self, entry.start_lba, entry.end_lba)
×
325

326
    def dismount_volumes(self) -> None:
9✔
327
        """Dismount all volumes associated with the disk."""
328
        self.check_closed()
×
329
        if not self._device:
×
330
            raise ValueError("Can only dismount volumes of block devices")
×
331
        log.info(f"{self} - Dismounting volumes")
×
332
        raise NotImplementedError
333

334
    def close(self) -> None:
9✔
335
        """Close the underlying IO object.
336

337
        This method has no effect if the IO object is already closed.
338
        """
339
        if self._closed:
×
340
            return
×
341
        os.close(self._fd)
×
342
        self._closed = True
×
343
        log.info(f"Closed disk {self}")
×
344

345
    def __enter__(self) -> Disk:
9✔
346
        """Context management protocol."""
347
        self.check_closed()
×
348
        return self
×
349

350
    def __exit__(
9✔
351
        self,
352
        exc_type: type[BaseException] | None,
353
        exc_val: BaseException | None,
354
        exc_tb: TracebackType,
355
    ) -> None:
356
        """Context management protocol."""
357
        self.close()
×
358

359
    @property
9✔
360
    def device(self) -> bool:
9✔
361
        """Whether the disk's data resides on a block device instead of a file."""
362
        return self._device
×
363

364
    @property
9✔
365
    def size(self) -> int:
9✔
366
        """Size of the disk in bytes."""
367
        return self._size
×
368

369
    @property
9✔
370
    def sector_size(self) -> SectorSize:
9✔
371
        """Logical and physical sector size of the disk, each in bytes."""
372
        return self._sector_size
×
373

374
    @property
9✔
375
    def table(self) -> Table | None:
9✔
376
        """Partition table last detected on the disk.
377

378
        `None` if no partition table was detected at that time.
379
        """
380
        return self._table
×
381

382
    @property
9✔
383
    def closed(self) -> bool:
9✔
384
        """Whether the underlying file or block device is closed."""
385
        return self._closed
×
386

387
    @property
9✔
388
    def writable(self) -> bool:
9✔
389
        """Whether the underlying file or block device supports writing."""
390
        self.check_closed()
×
391
        return self._writable
×
392

393
    def check_closed(self) -> None:
9✔
394
        """Raise `ValueError` if the underlying file or block device is closed."""
395
        if self._closed:
×
396
            raise ValueError("I/O operation on closed disk")
×
397

398
    def check_writable(self) -> None:
9✔
399
        """Raise `ValueError` if the underlying file or block device is read-only."""
400
        if not self._writable:
×
401
            raise ValueError("Disk is not writable")
×
402

403
    def __eq__(self, other: Any) -> bool:
9✔
404
        if isinstance(other, Disk):
×
405
            return self._path == other._path
×
406
        return NotImplemented
407

408
    def __str__(self) -> str:
9✔
409
        return self._path
×
410

411
    def __repr__(self) -> str:
9✔
412
        return f"{self.__class__.__name__}({self._path}, size={self._size})"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc