• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Morry98 / fastapi-task-manager / 22859984186

09 Mar 2026 03:07PM UTC coverage: 97.577%. First build
22859984186

Pull #2

github

web-flow
Merge cbce4c267 into d04f85da2
Pull Request #2: Improve task management with Redis Streams, leader election and management API

3182 of 3264 new or added lines in 32 files covered. (97.49%)

3342 of 3425 relevant lines covered (97.58%)

4.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.97
/src/fastapi_task_manager/runner.py
1
import asyncio
5✔
2
import contextlib
5✔
3
import logging
5✔
4
from typing import TYPE_CHECKING
5✔
5

6
from redis.asyncio import Redis
5✔
7

8
from fastapi_task_manager.coordinator import Coordinator
5✔
9
from fastapi_task_manager.leader_election import LeaderElector
5✔
10
from fastapi_task_manager.reconciler import Reconciler
5✔
11
from fastapi_task_manager.redis_keys import RedisKeyBuilder
5✔
12
from fastapi_task_manager.schema.worker_identity import WorkerIdentity
5✔
13
from fastapi_task_manager.statistics import StatisticsStorage
5✔
14
from fastapi_task_manager.stream_consumer import StreamConsumer
5✔
15

16
if TYPE_CHECKING:
17
    from fastapi_task_manager import TaskManager
18

19
logger = logging.getLogger("fastapi.task-manager.runner")
5✔
20

21

22
class Runner:
5✔
23
    """Task runner using Redis Streams with leader election.
24

25
    The Runner is responsible for executing scheduled tasks. It uses Redis Streams
26
    with leader election for task scheduling and consumer groups for execution.
27
    """
28

29
    def __init__(
5✔
30
        self,
31
        redis_client: Redis,
32
        concurrent_tasks: int,
33
        task_manager: "TaskManager",
34
    ):
35
        # Use WorkerIdentity for traceable worker identification
36
        self._worker = WorkerIdentity(task_manager.config.worker_service_name)
5✔
37
        self._uuid = self._worker.redis_safe_id
5✔
38
        # Initialize the key builder for centralized key construction
39
        self._key_builder = RedisKeyBuilder(task_manager.config.redis_key_prefix)
5✔
40
        self._redis_client = redis_client
5✔
41
        self._semaphore = asyncio.Semaphore(concurrent_tasks)
5✔
42
        self._task_manager = task_manager
5✔
43
        # Initialize statistics storage with Redis Streams for correlated entries
44
        self._statistics = StatisticsStorage(
5✔
45
            redis_client=redis_client,
46
            max_entries=task_manager.config.statistics_history_runs,
47
            ttl_seconds=task_manager.config.statistics_redis_expiration,
48
        )
49

50
        # Stream mode components (initialized lazily in start())
51
        self._leader_elector: LeaderElector | None = None
5✔
52
        self._coordinator: Coordinator | None = None
5✔
53
        self._consumer: StreamConsumer | None = None
5✔
54
        self._reconciler: Reconciler | None = None
5✔
55
        self._coordinator_task: asyncio.Task | None = None
5✔
56
        self._consumer_task: asyncio.Task | None = None
5✔
57
        self._reconciler_task: asyncio.Task | None = None
5✔
58

59
    # ---------------------------------------------------------------------------
60
    # Properties exposed for the management API (health endpoint)
61
    # ---------------------------------------------------------------------------
62

63
    @property
5✔
64
    def worker_id(self) -> str:
5✔
65
        """Return the human-readable worker short ID."""
66
        return self._worker.short_id
5✔
67

68
    @property
5✔
69
    def worker_started_at(self) -> str:
5✔
70
        """Return the ISO timestamp when this worker was initialized."""
71
        return self._worker.started_at
5✔
72

73
    @property
5✔
74
    def is_leader(self) -> bool:
5✔
75
        """Return whether this worker currently holds leadership."""
76
        if self._leader_elector is None:
5✔
77
            return False
5✔
NEW
78
        return self._leader_elector.is_leader
×
79

80
    async def start(self) -> None:
5✔
81
        """Start the runner with Redis Streams."""
82
        # Check if already running
83
        if self._consumer_task:
5✔
84
            msg = "Runner is already running."
5✔
85
            logger.warning(msg)
5✔
86
            return
5✔
87

88
        # Verify Redis connection
89
        try:
5✔
90
            pong = await self._redis_client.ping()  # ty: ignore[invalid-await]
5✔
91
        except Exception as e:
5✔
92
            msg = f"Redis ping failed: {e!r}"
5✔
93
            raise ConnectionError(msg) from e
5✔
94
        if not pong:
5✔
95
            msg = "Redis ping returned falsy response"
5✔
96
            raise ConnectionError(msg)
5✔
97

98
        config = self._task_manager.config
5✔
99

100
        # Initialize leader elector
101
        self._leader_elector = LeaderElector(
5✔
102
            redis_client=self._redis_client,
103
            key_builder=self._key_builder,
104
            worker_identity=self._worker,
105
            heartbeat_interval=config.leader_heartbeat_interval,
106
        )
107

108
        # Initialize stream consumer
109
        self._consumer = StreamConsumer(
5✔
110
            redis_client=self._redis_client,
111
            key_builder=self._key_builder,
112
            worker_identity=self._worker,
113
            task_manager=self._task_manager,
114
            semaphore=self._semaphore,
115
            statistics=self._statistics,
116
        )
117

118
        # Initialize coordinator
119
        self._coordinator = Coordinator(
5✔
120
            redis_client=self._redis_client,
121
            key_builder=self._key_builder,
122
            leader_elector=self._leader_elector,
123
            task_manager=self._task_manager,
124
        )
125

126
        # Initialize reconciler (leader only, but all workers start it;
127
        # the reconciler checks leadership internally before acting)
128
        self._reconciler = Reconciler(
5✔
129
            redis_client=self._redis_client,
130
            key_builder=self._key_builder,
131
            leader_elector=self._leader_elector,
132
            task_manager=self._task_manager,
133
        )
134

135
        # Setup consumer groups for both high and low priority streams
136
        await self._consumer.setup_consumer_groups()
5✔
137

138
        # Start coordinator (handles leader election internally)
139
        self._coordinator_task = await self._coordinator.start()
5✔
140

141
        # Start consumer (runs on all workers, recovers pending messages on startup)
142
        self._consumer_task = await self._consumer.start()
5✔
143

144
        # Start reconciler if enabled
145
        if self._reconciler:
5✔
146
            self._reconciler_task = await self._reconciler.start()
5✔
147

148
        logger.info("Runner started. Worker: %s", self._worker)
5✔
149

150
    async def stop(self) -> None:
5✔
151
        """Stop the runner and all stream mode components gracefully."""
152
        if not self._consumer_task and not self._coordinator_task:
5✔
153
            msg = "Runner is not running."
5✔
154
            logger.warning(msg)
5✔
155
            return
5✔
156

157
        # Stop reconciler first (stop recovery actions)
158
        if self._reconciler:
5✔
159
            await self._reconciler.stop()
5✔
160
        if self._reconciler_task:
5✔
161
            self._reconciler_task.cancel()
5✔
162
            with contextlib.suppress(asyncio.CancelledError):
5✔
163
                await self._reconciler_task
5✔
164
            self._reconciler_task = None
5✔
165

166
        # Stop coordinator (stop scheduling new tasks)
167
        if self._coordinator:
5✔
168
            await self._coordinator.stop()
5✔
169
        if self._coordinator_task:
5✔
170
            self._coordinator_task.cancel()
5✔
171
            with contextlib.suppress(asyncio.CancelledError):
5✔
172
                await self._coordinator_task
5✔
173
            self._coordinator_task = None
5✔
174

175
        # Stop consumer (stop processing tasks)
176
        if self._consumer:
5✔
177
            await self._consumer.stop()
5✔
178
        if self._consumer_task:
5✔
179
            self._consumer_task.cancel()
5✔
180
            with contextlib.suppress(asyncio.CancelledError):
5✔
181
                await self._consumer_task
5✔
182
            self._consumer_task = None
5✔
183

184
        # Release leadership if held
185
        if self._leader_elector:
5✔
186
            await self._leader_elector.release_leadership()
5✔
187

188
        # Clear references
189
        self._coordinator = None
5✔
190
        self._consumer = None
5✔
191
        self._reconciler = None
5✔
192
        self._leader_elector = None
5✔
193

194
        logger.info("Runner stopped.")
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc