• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

galthran-wq / telegram-scraper-service / 25725734842

12 May 2026 09:27AM UTC coverage: 67.568%. First build
25725734842

Pull #6

github

web-flow
Merge f8d8dc4a8 into e4cc589f0
Pull Request #6: feat: hot-add sessions via StringSession + pool-status endpoint

139 of 184 new or added lines in 6 files covered. (75.54%)

525 of 777 relevant lines covered (67.57%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.26
/src/core/session_pool.py
1
import asyncio
1✔
2
import contextlib
1✔
3
import os
1✔
4
import random
1✔
5
from collections import deque
1✔
6
from datetime import UTC, datetime
1✔
7
from itertools import cycle
1✔
8
from pathlib import Path
1✔
9
from urllib.parse import urlparse
1✔
10

11
import structlog
1✔
12
from telethon import TelegramClient
1✔
13
from telethon.sessions import StringSession
1✔
14

15
from src.config import settings
1✔
16

17
logger = structlog.get_logger()
1✔
18

19
STRING_SESSION_EXT = ".stringsession"
1✔
20
SQLITE_SESSION_EXT = ".session"
1✔
21
EVICTION_LOG_SIZE = 20
1✔
22

23

24
def _parse_proxy(proxy_url: str) -> tuple[object, ...]:
1✔
25
    from python_socks import ProxyType
×
26

27
    parsed = urlparse(proxy_url)
×
28
    scheme = parsed.scheme.lower()
×
NEW
29
    scheme_map = {
×
30
        "http": ProxyType.HTTP,
31
        "https": ProxyType.HTTP,
32
        "socks5": ProxyType.SOCKS5,
33
        "socks4": ProxyType.SOCKS4,
34
    }
35
    proxy_type = scheme_map.get(scheme)
×
36
    if proxy_type is None:
×
37
        raise ValueError(f"Unsupported proxy scheme: {scheme}")
×
38
    if not parsed.hostname or not parsed.port:
×
39
        raise ValueError(f"Proxy URL must include host and port: {proxy_url}")
×
40
    return (proxy_type, parsed.hostname, parsed.port, True, parsed.username, parsed.password)
×
41

42

43
class SessionPool:
1✔
44
    def __init__(self) -> None:
1✔
45
        self._clients: list[TelegramClient] = []
1✔
46
        self._cycle: cycle[TelegramClient] | None = None
1✔
47
        self._lock = asyncio.Lock()
1✔
48
        self._proxy: tuple[object, ...] | None = None
1✔
49
        self._session_paths: dict[TelegramClient, str] = {}
1✔
50
        self._evictions: deque[dict[str, str]] = deque(maxlen=EVICTION_LOG_SIZE)
1✔
51
        self._rescan_task: asyncio.Task[None] | None = None
1✔
52
        self._inflight_rescan_paths: set[str] = set()
1✔
53

54
    async def init(self) -> None:
1✔
55
        if settings.proxy:
1✔
56
            self._proxy = _parse_proxy(settings.proxy)
×
57
            logger.info("proxy_configured", scheme=settings.proxy.split("://")[0])
×
58

59
        sessions_dir = Path(settings.sessions_dir)
1✔
60
        if not sessions_dir.exists():
1✔
61
            logger.warning("sessions_dir_not_found", path=str(sessions_dir))
×
62
            return
×
63

64
        session_paths = self._discover_session_paths(sessions_dir)
1✔
65
        if not session_paths:
1✔
66
            logger.warning("no_sessions_found", path=str(sessions_dir))
1✔
67

68
        for session_path in session_paths:
1✔
NEW
69
            await self._load(session_path)
×
70

71
        if self._clients:
1✔
72
            random.shuffle(self._clients)
×
73
            self._cycle = cycle(self._clients)
×
74
            logger.info("session_pool_ready", count=len(self._clients))
×
75

76
        if settings.sessions_rescan_interval > 0:
1✔
77
            self._rescan_task = asyncio.create_task(self._rescan_loop())
1✔
78

79
    def _discover_session_paths(self, sessions_dir: Path) -> list[str]:
1✔
80
        sqlite_paths = [str(p.with_suffix("")) for p in sessions_dir.glob(f"*{SQLITE_SESSION_EXT}")]
1✔
81
        string_paths = [str(p) for p in sessions_dir.glob(f"*{STRING_SESSION_EXT}")]
1✔
82
        return sorted(sqlite_paths + string_paths)
1✔
83

84
    async def _load(self, session_path: str) -> TelegramClient | None:
1✔
NEW
85
        client = await self._connect(session_path)
×
NEW
86
        if not client:
×
NEW
87
            return None
×
NEW
88
        self._clients.append(client)
×
NEW
89
        self._session_paths[client] = session_path
×
NEW
90
        logger.info("session_loaded", session=Path(session_path).name)
×
NEW
91
        return client
×
92

93
    async def _connect(self, session_path: str) -> TelegramClient | None:
1✔
94
        if session_path.endswith(STRING_SESSION_EXT):
1✔
95
            try:
1✔
96
                session_str = Path(session_path).read_text(encoding="utf-8").strip()
1✔
NEW
97
            except OSError as e:
×
NEW
98
                logger.warning("string_session_read_failed", session=Path(session_path).name, error=str(e))
×
NEW
99
                return None
×
100
            if not session_str:
1✔
NEW
101
                logger.warning("string_session_empty", session=Path(session_path).name)
×
NEW
102
                return None
×
103
            session: StringSession | str = StringSession(session_str)
1✔
104
        else:
NEW
105
            session = session_path
×
106

107
        client = TelegramClient(session, settings.telegram_api_id, settings.telegram_api_hash, proxy=self._proxy)
1✔
108
        try:
1✔
109
            await client.connect()
1✔
110
            if not await client.is_user_authorized():
1✔
111
                logger.warning("session_not_authorized", session=Path(session_path).name)
1✔
112
                await client.disconnect()
1✔
113
                return None
1✔
NEW
114
        except Exception as e:
×
NEW
115
            logger.warning("session_connect_failed", session=Path(session_path).name, error=str(e))
×
NEW
116
            with contextlib.suppress(Exception):
×
NEW
117
                await client.disconnect()
×
NEW
118
            return None
×
119
        return client
1✔
120

121
    async def add_string_session(self, name: str, session_str: str) -> dict[str, object]:
1✔
122
        if not name or "/" in name or "\\" in name or name.startswith("."):
1✔
NEW
123
            return {"error": "invalid name"}
×
124
        session_str = session_str.strip()
1✔
125
        if not session_str:
1✔
NEW
126
            return {"error": "empty session string"}
×
127

128
        sessions_dir = Path(settings.sessions_dir)
1✔
129
        sessions_dir.mkdir(parents=True, exist_ok=True)
1✔
130
        dest = sessions_dir / f"{name}{STRING_SESSION_EXT}"
1✔
131
        tmp = dest.with_suffix(STRING_SESSION_EXT + ".tmp")
1✔
132

133
        async with self._lock:
1✔
134
            if str(dest) in self._session_paths.values() or str(dest) in self._inflight_rescan_paths:
1✔
NEW
135
                return {"error": f"session '{name}' already loaded"}
×
136
            if dest.exists():
1✔
NEW
137
                return {"error": f"session file '{dest.name}' already exists"}
×
138
            tmp.write_text(session_str + "\n", encoding="utf-8")
1✔
139
            os.replace(tmp, dest)
1✔
140
            self._inflight_rescan_paths.add(str(dest))
1✔
141

142
        try:
1✔
143
            client = await self._connect(str(dest))
1✔
144
            if not client:
1✔
145
                with contextlib.suppress(OSError):
1✔
146
                    dest.unlink()
1✔
147
                return {"error": "session failed to authorize"}
1✔
148
            try:
1✔
149
                me = await client.get_me()
1✔
150
                phone = getattr(me, "phone", None)
1✔
151
                user_id = getattr(me, "id", None)
1✔
NEW
152
            except Exception:
×
NEW
153
                logger.warning("get_me_failed", session=name)
×
NEW
154
                phone = None
×
NEW
155
                user_id = None
×
156
        finally:
157
            async with self._lock:
1✔
158
                self._inflight_rescan_paths.discard(str(dest))
1✔
159

160
        async with self._lock:
1✔
161
            self._clients.append(client)
1✔
162
            self._session_paths[client] = str(dest)
1✔
163
            self._cycle = cycle(self._clients)
1✔
164
            logger.info("session_added", session=dest.name, count=len(self._clients))
1✔
165

166
        return {"name": name, "phone": phone, "user_id": user_id}
1✔
167

168
    async def rescan(self) -> int:
1✔
169
        sessions_dir = Path(settings.sessions_dir)
1✔
170
        if not sessions_dir.exists():
1✔
NEW
171
            return 0
×
172

173
        async with self._lock:
1✔
174
            known = set(self._session_paths.values())
1✔
175
            discovered = set(self._discover_session_paths(sessions_dir))
1✔
176
            new_paths = sorted(discovered - known - self._inflight_rescan_paths)
1✔
177
            self._inflight_rescan_paths |= set(new_paths)
1✔
178

179
        connected: list[tuple[str, TelegramClient]] = []
1✔
180
        try:
1✔
181
            for session_path in new_paths:
1✔
182
                client = await self._connect(session_path)
1✔
183
                if client:
1✔
184
                    connected.append((session_path, client))
1✔
185
        finally:
186
            async with self._lock:
1✔
187
                self._inflight_rescan_paths -= set(new_paths)
1✔
188

189
        if not connected:
1✔
190
            return 0
1✔
191

192
        async with self._lock:
1✔
193
            for session_path, client in connected:
1✔
194
                self._clients.append(client)
1✔
195
                self._session_paths[client] = session_path
1✔
196
                logger.info("session_loaded", session=Path(session_path).name)
1✔
197
            self._cycle = cycle(self._clients)
1✔
198
            logger.info("session_pool_rescanned", added=len(connected), total=len(self._clients))
1✔
199
        return len(connected)
1✔
200

201
    async def _rescan_loop(self) -> None:
1✔
NEW
202
        while True:
×
NEW
203
            try:
×
NEW
204
                await asyncio.sleep(settings.sessions_rescan_interval)
×
NEW
205
                await self.rescan()
×
NEW
206
            except asyncio.CancelledError:
×
NEW
207
                raise
×
NEW
208
            except Exception:
×
NEW
209
                logger.exception("rescan_loop_error")
×
210

211
    async def reconnect(self, client: TelegramClient) -> bool:
1✔
212
        session_path = self._session_paths.get(client)
×
213
        if not session_path:
×
214
            return False
×
NEW
215
        with contextlib.suppress(Exception):
×
216
            await client.disconnect()
×
217
        try:
×
218
            await client.connect()
×
219
            if await client.is_user_authorized():
×
220
                logger.info("session_reconnected", session=Path(session_path).name)
×
221
                return True
×
222
        except Exception as e:
×
223
            logger.warning("reconnect_failed", session=Path(session_path).name, error=str(e))
×
224
        return False
×
225

226
    async def close(self) -> None:
1✔
227
        if self._rescan_task:
1✔
228
            self._rescan_task.cancel()
1✔
229
            with contextlib.suppress(asyncio.CancelledError, Exception):
1✔
230
                await self._rescan_task
1✔
231
            self._rescan_task = None
1✔
232
        for client in self._clients:
1✔
233
            await client.disconnect()
×
234
        self._clients.clear()
1✔
235
        self._session_paths.clear()
1✔
236
        self._cycle = None
1✔
237

238
    async def get_next(self) -> TelegramClient | None:
1✔
239
        async with self._lock:
1✔
240
            if not self._cycle:
1✔
241
                return None
×
242
            return next(self._cycle)
1✔
243

244
    async def remove_client(self, client: TelegramClient, reason: str = "unknown") -> None:
1✔
245
        async with self._lock:
1✔
246
            if client not in self._clients:
1✔
247
                return
×
248
            session_path = self._session_paths.get(client, "")
1✔
249
            session_name = Path(session_path).name if session_path else "unknown"
1✔
250
            self._clients.remove(client)
1✔
251
            self._session_paths.pop(client, None)
1✔
252
            if self._clients:
1✔
253
                self._cycle = cycle(self._clients)
×
254
            else:
255
                self._cycle = None
1✔
256
            remaining = len(self._clients)
1✔
257
            self._evictions.append(
1✔
258
                {
259
                    "ts": datetime.now(UTC).isoformat(),
260
                    "session": session_name,
261
                    "reason": reason,
262
                }
263
            )
264
        with contextlib.suppress(Exception):
1✔
265
            await client.disconnect()
1✔
266
        logger.warning("session_removed", session=session_name, reason=reason, remaining=remaining)
1✔
267

268
    def status(self) -> dict[str, object]:
1✔
269
        sessions_dir = Path(settings.sessions_dir)
1✔
270
        configured = len(self._discover_session_paths(sessions_dir)) if sessions_dir.exists() else 0
1✔
271
        sessions = []
1✔
272
        for client, path in self._session_paths.items():
1✔
273
            try:
1✔
274
                connected = client.is_connected()
1✔
NEW
275
            except Exception:
×
NEW
276
                connected = False
×
277
            sessions.append({"name": Path(path).name, "connected": bool(connected)})
1✔
278
        return {
1✔
279
            "alive": len(self._clients),
280
            "configured": configured,
281
            "sessions": sessions,
282
            "recent_evictions": list(self._evictions),
283
            "rescan_interval": settings.sessions_rescan_interval,
284
        }
285

286
    @property
1✔
287
    def available(self) -> bool:
1✔
288
        return len(self._clients) > 0
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc