• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sivakov512 / asyncpg-engine / 16981542026

15 Aug 2025 02:37AM UTC coverage: 46.25%. Remained the same
16981542026

push

github

sivakov512
Update dependency pytest to v8

37 of 80 relevant lines covered (46.25%)

1.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.23
/asyncpg_engine/engine.py
1
__all__ = ["Engine"]
×
2

3
import typing
×
4
from types import TracebackType
×
5

6
from asyncpg import Connection, create_pool
×
7
from asyncpg.pool import Pool
×
8

9

10
class Engine:
×
11
    __slots__ = "_pool", "_use_single_con", "_global_con"
×
12

13
    _pool: Pool
×
14
    _use_single_con: bool
×
15
    _global_con: typing.Optional[Connection]
×
16

17
    def __init__(self, pool: Pool, use_single_connection: bool):
×
18
        self._pool = pool
3✔
19
        self._use_single_con = use_single_connection
3✔
20

21
        self._global_con = None
3✔
22

23
    @classmethod
×
24
    async def create(
×
25
        cls, url: str, *, use_single_connection: bool = False, **kwargs: typing.Any
26
    ) -> "Engine":
27
        pool = await create_pool(url, min_size=2, init=cls._set_codecs, **kwargs)
3✔
28
        return cls(pool, use_single_connection)
3✔
29

30
    async def close(self) -> None:
×
31
        await self._pool.close()
3✔
32

33
    def acquire(self) -> "ConnectionAcquire":
×
34
        return ConnectionAcquire(self)
3✔
35

36
    async def _acquire(self) -> Connection:
×
37
        if self._use_single_con:
3✔
38
            self._global_con = self._global_con or await self._pool.acquire()
3✔
39
            return self._global_con
3✔
40

41
        return await self._pool.acquire()
3✔
42

43
    async def release(self, con: Connection, *, force: bool = False) -> None:
×
44
        if not self._use_single_con or force:
3✔
45
            await self._pool.release(con)
3✔
46
            self._global_con = None
3✔
47

48
    async def healthcheck(self) -> None:
×
49
        async with self.acquire() as con:  # type: Connection
3✔
50
            await con.execute("SELECT 1")
×
51

52
    @staticmethod
2✔
53
    async def _set_codecs(con: Connection) -> None:
×
54
        """Override this method if you want to set custom codecs."""
55

56

57
class ConnectionAcquire:
×
58
    __slots__ = "engine", "con"
×
59

60
    engine: Engine
×
61
    con: typing.Optional[Connection]
×
62

63
    def __init__(self, engine: Engine):
×
64
        self.engine = engine
3✔
65
        self.con = None
3✔
66

67
    async def __call__(self) -> Connection:
×
68
        self.con = await self.engine._acquire()
3✔
69
        return self.con
3✔
70

71
    def __await__(self) -> typing.Generator[Connection, None, None]:
×
72
        return self().__await__()
3✔
73

74
    async def __aenter__(self) -> Connection:
×
75
        return await self()
3✔
76

77
    async def __aexit__(
×
78
        self, exc_type: typing.Type[BaseException], exc: BaseException, tv: TracebackType
79
    ) -> None:
80
        await self.engine.release(self.con)
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc