• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10653597381

01 Sep 2024 11:25AM UTC coverage: 91.671% (-0.1%) from 91.788%
10653597381

Pull #761

github

web-flow
Merge 6d0f355bd into 8a5b34626
Pull Request #761: Delegated the implementations of Lock and Semaphore to the async backend class

229 of 250 new or added lines in 4 files covered. (91.6%)

2 existing lines in 2 files now uncovered.

4744 of 5175 relevant lines covered (91.67%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.1
/src/anyio/_core/_testing.py
1
from __future__ import annotations
11✔
2

3
from collections.abc import Awaitable, Generator
11✔
4
from typing import Any, cast
11✔
5

6
from ._eventloop import get_async_backend
11✔
7

8

9
class TaskInfo:
11✔
10
    """
11
    Represents an asynchronous task.
12

13
    :ivar int id: the unique identifier of the task
14
    :ivar parent_id: the identifier of the parent task, if any
15
    :vartype parent_id: Optional[int]
16
    :ivar str name: the description of the task (if any)
17
    :ivar ~collections.abc.Coroutine coro: the coroutine object of the task
18
    """
19

20
    __slots__ = "_name", "id", "parent_id", "name", "coro"
11✔
21

22
    def __init__(
11✔
23
        self,
24
        id: int,
25
        parent_id: int | None,
26
        name: str | None,
27
        coro: Generator[Any, Any, Any] | Awaitable[Any],
28
    ):
29
        func = get_current_task
11✔
30
        self._name = f"{func.__module__}.{func.__qualname__}"
11✔
31
        self.id: int = id
11✔
32
        self.parent_id: int | None = parent_id
11✔
33
        self.name: str | None = name
11✔
34
        self.coro: Generator[Any, Any, Any] | Awaitable[Any] = coro
11✔
35

36
    def __eq__(self, other: object) -> bool:
11✔
37
        if isinstance(other, TaskInfo):
11✔
38
            return self.id == other.id
11✔
39

UNCOV
40
        return NotImplemented
×
41

42
    def __hash__(self) -> int:
11✔
43
        return hash(self.id)
11✔
44

45
    def __repr__(self) -> str:
11✔
46
        return f"{self.__class__.__name__}(id={self.id!r}, name={self.name!r})"
11✔
47

48
    def has_pending_cancellation(self) -> bool:
11✔
49
        """
50
        Return ``True`` if the task has a cancellation pending, ``False`` otherwise.
51

52
        """
53
        return False
×
54

55

56
def get_current_task() -> TaskInfo:
11✔
57
    """
58
    Return the current task.
59

60
    :return: a representation of the current task
61

62
    """
63
    return get_async_backend().get_current_task()
11✔
64

65

66
def get_running_tasks() -> list[TaskInfo]:
11✔
67
    """
68
    Return a list of running tasks in the current event loop.
69

70
    :return: a list of task info objects
71

72
    """
73
    return cast("list[TaskInfo]", get_async_backend().get_running_tasks())
11✔
74

75

76
async def wait_all_tasks_blocked() -> None:
11✔
77
    """Wait until all other tasks are waiting for something."""
78
    await get_async_backend().wait_all_tasks_blocked()
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc