• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mosquito / caio / 22691535268

04 Mar 2026 10:03PM UTC coverage: 88.182%. Remained the same
22691535268

push

github

mosquito
new ci/cd

291 of 330 relevant lines covered (88.18%)

11.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.1
/caio/python_aio.py
1
import os
14✔
2
from collections import defaultdict
14✔
3
from enum import IntEnum, unique
14✔
4
from io import BytesIO
14✔
5
from multiprocessing.pool import ThreadPool
14✔
6
from threading import Lock, RLock
14✔
7
from types import MappingProxyType
14✔
8
from typing import Any, Callable, Optional, Union
14✔
9

10
from .abstract import AbstractContext, AbstractOperation
14✔
11

12

13
fdsync = getattr(os, "fdatasync", os.fsync)
14✔
14
NATIVE_PREAD_PWRITE = hasattr(os, "pread") and hasattr(os, "pwrite")
14✔
15

16

17
@unique
14✔
18
class OpCode(IntEnum):
14✔
19
    READ = 0
14✔
20
    WRITE = 1
14✔
21
    FSYNC = 2
14✔
22
    FDSYNC = 3
14✔
23
    NOOP = -1
14✔
24

25

26
class Context(AbstractContext):
14✔
27
    """
28
    python aio context implementation
29
    """
30

31
    MAX_POOL_SIZE = 128
14✔
32

33
    def __init__(self, max_requests: int = 32, pool_size: int = 8):
14✔
34
        assert pool_size < self.MAX_POOL_SIZE
14✔
35

36
        self.__max_requests = max_requests
14✔
37
        self.pool = ThreadPool(pool_size)
14✔
38
        self._in_progress = 0
14✔
39
        self._closed = False
14✔
40
        self._closed_lock = Lock()
14✔
41

42
        if not NATIVE_PREAD_PWRITE:
14✔
43
            self._locks_cleaner = RLock()       # type: ignore
4✔
44
            self._locks = defaultdict(RLock)    # type: ignore
4✔
45

46
    @property
14✔
47
    def max_requests(self) -> int:
14✔
48
        return self.__max_requests
×
49

50
    def _execute(self, operation: "Operation"):
14✔
51
        handler = self._OP_MAP[operation.opcode]
14✔
52

53
        def on_error(exc):
14✔
54
            self._in_progress -= 1
14✔
55
            operation.exception = exc
14✔
56
            operation.written = 0
14✔
57
            operation.callback(None)
14✔
58

59
        def on_success(result):
14✔
60
            self._in_progress -= 1
14✔
61
            operation.written = result
14✔
62
            operation.callback(result)
14✔
63

64
        if self._in_progress > self.__max_requests:
14✔
65
            raise RuntimeError(
×
66
                "Maximum simultaneous requests have been reached",
67
            )
68

69
        self._in_progress += 1
14✔
70

71
        self.pool.apply_async(
14✔
72
            handler, args=(self, operation),
73
            callback=on_success,
74
            error_callback=on_error,
75
        )
76

77
    if NATIVE_PREAD_PWRITE:
14✔
78
        def __pread(self, fd, size, offset):
10✔
79
            return os.pread(fd, size, offset)
10✔
80

81
        def __pwrite(self, fd, bytes, offset):
10✔
82
            return os.pwrite(fd, bytes, offset)
10✔
83
    else:
84
        def __pread(self, fd, size, offset):
4✔
85
            with self._locks[fd]:
4✔
86
                os.lseek(fd, 0, os.SEEK_SET)
4✔
87
                os.lseek(fd, offset, os.SEEK_SET)
4✔
88
                return os.read(fd, size)
4✔
89

90
        def __pwrite(self, fd, bytes, offset):
4✔
91
            with self._locks[fd]:
4✔
92
                os.lseek(fd, 0, os.SEEK_SET)
4✔
93
                os.lseek(fd, offset, os.SEEK_SET)
4✔
94
                return os.write(fd, bytes)
4✔
95

96
    def _handle_read(self, operation: "Operation"):
14✔
97
        return operation.buffer.write(
14✔
98
            self.__pread(
99
                operation.fileno,
100
                operation.nbytes,
101
                operation.offset,
102
            ),
103
        )
104

105
    def _handle_write(self, operation: "Operation"):
14✔
106
        return self.__pwrite(
14✔
107
            operation.fileno, operation.buffer.getvalue(), operation.offset,
108
        )
109

110
    def _handle_fsync(self, operation: "Operation"):
14✔
111
        return os.fsync(operation.fileno)
×
112

113
    def _handle_fdsync(self, operation: "Operation"):
14✔
114
        return fdsync(operation.fileno)
14✔
115

116
    def _handle_noop(self, operation: "Operation"):
14✔
117
        return
×
118

119
    def submit(self, *aio_operations) -> int:
14✔
120
        operations = []
14✔
121

122
        for operation in aio_operations:
14✔
123
            if not isinstance(operation, Operation):
14✔
124
                raise ValueError("Invalid Operation %r", operation)
×
125

126
            operations.append(operation)
14✔
127

128
        count = 0
14✔
129
        for operation in operations:
14✔
130
            self._execute(operation)
14✔
131
            count += 1
14✔
132

133
        return count
14✔
134

135
    def cancel(self, *aio_operations) -> int:
14✔
136
        """
137
        Cancels multiple Operations. Returns
138

139
         Operation.cancel(aio_op1, aio_op2, aio_opN, ...) -> int
140

141
        (Always returns zero, this method exists for compatibility reasons)
142
        """
143
        return 0
14✔
144

145
    def close(self):
14✔
146
        if self._closed:
14✔
147
            return
×
148

149
        with self._closed_lock:
14✔
150
            self.pool.close()
14✔
151
            self._closed = True
14✔
152

153
    def __del__(self):
14✔
154
        if self.pool.close():
14✔
155
            self.close()
×
156

157
    _OP_MAP = MappingProxyType({
14✔
158
        OpCode.READ: _handle_read,
159
        OpCode.WRITE: _handle_write,
160
        OpCode.FSYNC: _handle_fsync,
161
        OpCode.FDSYNC: _handle_fdsync,
162
        OpCode.NOOP: _handle_noop,
163
    })
164

165

166
# noinspection PyPropertyDefinition
167
class Operation(AbstractOperation):
14✔
168
    """
169
    python aio operation implementation
170
    """
171
    def __init__(
14✔
172
        self,
173
        fd: int,
174
        nbytes: Optional[int],
175
        offset: Optional[int],
176
        opcode: OpCode,
177
        payload: Optional[bytes] = None,
178
        priority: Optional[int] = None,
179
    ):
180
        self.callback = None    # type: Optional[Callable[[int], Any]]
14✔
181
        self.buffer = BytesIO()
14✔
182

183
        if opcode == OpCode.WRITE and payload:
14✔
184
            self.buffer = BytesIO(payload)
14✔
185

186
        self.opcode = opcode
14✔
187
        self.__fileno = fd
14✔
188
        self.__offset = offset or 0
14✔
189
        self.__opcode = opcode
14✔
190
        self.__nbytes = nbytes or 0
14✔
191
        self.__priority = priority or 0
14✔
192
        self.exception = None
14✔
193
        self.written = 0
14✔
194

195
    @classmethod
14✔
196
    def read(
14✔
197
        cls, nbytes: int, fd: int, offset: int, priority=0,
198
    ) -> "Operation":
199
        """
200
        Creates a new instance of Operation on read mode.
201
        """
202
        return cls(fd, nbytes, offset, opcode=OpCode.READ, priority=priority)
14✔
203

204
    @classmethod
14✔
205
    def write(
14✔
206
        cls, payload_bytes: bytes, fd: int, offset: int, priority=0,
207
    ) -> "Operation":
208
        """
209
        Creates a new instance of AIOOperation on write mode.
210
        """
211
        return cls(
14✔
212
            fd,
213
            len(payload_bytes),
214
            offset,
215
            payload=payload_bytes,
216
            opcode=OpCode.WRITE,
217
            priority=priority,
218
        )
219

220
    @classmethod
14✔
221
    def fsync(cls, fd: int, priority=0) -> "Operation":
14✔
222

223
        """
224
        Creates a new instance of AIOOperation on fsync mode.
225
        """
226
        return cls(fd, None, None, opcode=OpCode.FSYNC, priority=priority)
×
227

228
    @classmethod
14✔
229
    def fdsync(cls, fd: int, priority=0) -> "Operation":
14✔
230

231
        """
232
        Creates a new instance of AIOOperation on fdsync mode.
233
        """
234
        return cls(fd, None, None, opcode=OpCode.FDSYNC, priority=priority)
14✔
235

236
    def get_value(self) -> Union[bytes, int]:
14✔
237
        """
238
        Method returns a bytes value of AIOOperation's result or None.
239
        """
240
        if self.exception:
14✔
241
            raise self.exception
14✔
242

243
        if self.opcode == OpCode.WRITE:
14✔
244
            return self.written
14✔
245

246
        if self.buffer is None:
14✔
247
            return
×
248

249
        return self.buffer.getvalue()
14✔
250

251
    @property
14✔
252
    def fileno(self) -> int:
14✔
253
        return self.__fileno
14✔
254

255
    @property
14✔
256
    def offset(self) -> int:
14✔
257
        return self.__offset
14✔
258

259
    @property
14✔
260
    def payload(self) -> Optional[memoryview]:
14✔
261
        return self.buffer.getbuffer()
×
262

263
    @property
14✔
264
    def nbytes(self) -> int:
14✔
265
        return self.__nbytes
14✔
266

267
    def set_callback(self, callback: Callable[[int], Any]) -> bool:
14✔
268
        self.callback = callback
14✔
269
        return True
14✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc