• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

galthran-wq / telegram-scraper-service / 25725455484

12 May 2026 09:21AM UTC coverage: 66.975%. First build
25725455484

Pull #6

github

web-flow
Merge a334e7ce7 into e4cc589f0
Pull Request #6: feat: hot-add sessions via StringSession + pool-status endpoint

121 of 164 new or added lines in 6 files covered. (73.78%)

507 of 757 relevant lines covered (66.97%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.06
/src/core/session_pool.py
1
import asyncio
1✔
2
import contextlib
1✔
3
import os
1✔
4
import random
1✔
5
from collections import deque
1✔
6
from datetime import UTC, datetime
1✔
7
from itertools import cycle
1✔
8
from pathlib import Path
1✔
9
from urllib.parse import urlparse
1✔
10

11
import structlog
1✔
12
from telethon import TelegramClient
1✔
13
from telethon.sessions import StringSession
1✔
14

15
from src.config import settings
1✔
16

17
logger = structlog.get_logger()
1✔
18

19
STRING_SESSION_EXT = ".stringsession"
1✔
20
SQLITE_SESSION_EXT = ".session"
1✔
21
EVICTION_LOG_SIZE = 20
1✔
22

23

24
def _parse_proxy(proxy_url: str) -> tuple[object, ...]:
1✔
25
    from python_socks import ProxyType
×
26

27
    parsed = urlparse(proxy_url)
×
28
    scheme = parsed.scheme.lower()
×
NEW
29
    scheme_map = {
×
30
        "http": ProxyType.HTTP,
31
        "https": ProxyType.HTTP,
32
        "socks5": ProxyType.SOCKS5,
33
        "socks4": ProxyType.SOCKS4,
34
    }
35
    proxy_type = scheme_map.get(scheme)
×
36
    if proxy_type is None:
×
37
        raise ValueError(f"Unsupported proxy scheme: {scheme}")
×
38
    if not parsed.hostname or not parsed.port:
×
39
        raise ValueError(f"Proxy URL must include host and port: {proxy_url}")
×
40
    return (proxy_type, parsed.hostname, parsed.port, True, parsed.username, parsed.password)
×
41

42

43
class SessionPool:
1✔
44
    def __init__(self) -> None:
1✔
45
        self._clients: list[TelegramClient] = []
1✔
46
        self._cycle: cycle[TelegramClient] | None = None
1✔
47
        self._lock = asyncio.Lock()
1✔
48
        self._proxy: tuple[object, ...] | None = None
1✔
49
        self._session_paths: dict[TelegramClient, str] = {}
1✔
50
        self._evictions: deque[dict[str, str]] = deque(maxlen=EVICTION_LOG_SIZE)
1✔
51
        self._rescan_task: asyncio.Task[None] | None = None
1✔
52

53
    async def init(self) -> None:
1✔
54
        if settings.proxy:
1✔
55
            self._proxy = _parse_proxy(settings.proxy)
×
56
            logger.info("proxy_configured", scheme=settings.proxy.split("://")[0])
×
57

58
        sessions_dir = Path(settings.sessions_dir)
1✔
59
        if not sessions_dir.exists():
1✔
60
            logger.warning("sessions_dir_not_found", path=str(sessions_dir))
×
61
            return
×
62

63
        session_paths = self._discover_session_paths(sessions_dir)
1✔
64
        if not session_paths:
1✔
65
            logger.warning("no_sessions_found", path=str(sessions_dir))
1✔
66

67
        for session_path in session_paths:
1✔
NEW
68
            await self._load(session_path)
×
69

70
        if self._clients:
1✔
71
            random.shuffle(self._clients)
×
72
            self._cycle = cycle(self._clients)
×
73
            logger.info("session_pool_ready", count=len(self._clients))
×
74

75
        if settings.sessions_rescan_interval > 0:
1✔
76
            self._rescan_task = asyncio.create_task(self._rescan_loop())
1✔
77

78
    def _discover_session_paths(self, sessions_dir: Path) -> list[str]:
1✔
79
        sqlite_paths = [str(p.with_suffix("")) for p in sessions_dir.glob(f"*{SQLITE_SESSION_EXT}")]
1✔
80
        string_paths = [str(p) for p in sessions_dir.glob(f"*{STRING_SESSION_EXT}")]
1✔
81
        return sorted(sqlite_paths + string_paths)
1✔
82

83
    async def _load(self, session_path: str) -> TelegramClient | None:
1✔
NEW
84
        client = await self._connect(session_path)
×
NEW
85
        if not client:
×
NEW
86
            return None
×
NEW
87
        self._clients.append(client)
×
NEW
88
        self._session_paths[client] = session_path
×
NEW
89
        logger.info("session_loaded", session=Path(session_path).name)
×
NEW
90
        return client
×
91

92
    async def _connect(self, session_path: str) -> TelegramClient | None:
1✔
93
        if session_path.endswith(STRING_SESSION_EXT):
1✔
94
            try:
1✔
95
                session_str = Path(session_path).read_text(encoding="utf-8").strip()
1✔
NEW
96
            except OSError as e:
×
NEW
97
                logger.warning("string_session_read_failed", session=Path(session_path).name, error=str(e))
×
NEW
98
                return None
×
99
            if not session_str:
1✔
NEW
100
                logger.warning("string_session_empty", session=Path(session_path).name)
×
NEW
101
                return None
×
102
            session: StringSession | str = StringSession(session_str)
1✔
103
        else:
NEW
104
            session = session_path
×
105

106
        client = TelegramClient(session, settings.telegram_api_id, settings.telegram_api_hash, proxy=self._proxy)
1✔
107
        try:
1✔
108
            await client.connect()
1✔
109
            if not await client.is_user_authorized():
1✔
110
                logger.warning("session_not_authorized", session=Path(session_path).name)
1✔
111
                await client.disconnect()
1✔
112
                return None
1✔
NEW
113
        except Exception as e:
×
NEW
114
            logger.warning("session_connect_failed", session=Path(session_path).name, error=str(e))
×
NEW
115
            with contextlib.suppress(Exception):
×
NEW
116
                await client.disconnect()
×
NEW
117
            return None
×
118
        return client
1✔
119

120
    async def add_string_session(self, name: str, session_str: str) -> dict[str, object]:
1✔
121
        if not name or "/" in name or "\\" in name or name.startswith("."):
1✔
NEW
122
            return {"error": "invalid name"}
×
123
        session_str = session_str.strip()
1✔
124
        if not session_str:
1✔
NEW
125
            return {"error": "empty session string"}
×
126

127
        sessions_dir = Path(settings.sessions_dir)
1✔
128
        sessions_dir.mkdir(parents=True, exist_ok=True)
1✔
129
        dest = sessions_dir / f"{name}{STRING_SESSION_EXT}"
1✔
130
        tmp = dest.with_suffix(STRING_SESSION_EXT + ".tmp")
1✔
131

132
        async with self._lock:
1✔
133
            if str(dest) in self._session_paths.values():
1✔
NEW
134
                return {"error": f"session '{name}' already loaded"}
×
135
            if dest.exists():
1✔
NEW
136
                return {"error": f"session file '{dest.name}' already exists"}
×
137

138
            tmp.write_text(session_str + "\n", encoding="utf-8")
1✔
139
            os.replace(tmp, dest)
1✔
140

141
            client = await self._connect(str(dest))
1✔
142
            if not client:
1✔
143
                with contextlib.suppress(OSError):
1✔
144
                    dest.unlink()
1✔
145
                return {"error": "session failed to authorize"}
1✔
146

147
            try:
1✔
148
                me = await client.get_me()
1✔
149
                phone = getattr(me, "phone", None)
1✔
150
                user_id = getattr(me, "id", None)
1✔
NEW
151
            except Exception as e:
×
NEW
152
                logger.warning("get_me_failed", session=name, error=str(e))
×
NEW
153
                phone = None
×
NEW
154
                user_id = None
×
155

156
            self._clients.append(client)
1✔
157
            self._session_paths[client] = str(dest)
1✔
158
            self._cycle = cycle(self._clients)
1✔
159
            logger.info("session_added", session=dest.name, count=len(self._clients))
1✔
160

161
        return {"name": name, "phone": phone, "user_id": user_id}
1✔
162

163
    async def rescan(self) -> int:
1✔
164
        sessions_dir = Path(settings.sessions_dir)
1✔
165
        if not sessions_dir.exists():
1✔
NEW
166
            return 0
×
167

168
        async with self._lock:
1✔
169
            known = set(self._session_paths.values())
1✔
170
            discovered = set(self._discover_session_paths(sessions_dir))
1✔
171
            new_paths = sorted(discovered - known)
1✔
172
            added = 0
1✔
173
            for session_path in new_paths:
1✔
174
                client = await self._connect(session_path)
1✔
175
                if not client:
1✔
NEW
176
                    continue
×
177
                self._clients.append(client)
1✔
178
                self._session_paths[client] = session_path
1✔
179
                added += 1
1✔
180
                logger.info("session_loaded", session=Path(session_path).name)
1✔
181
            if added:
1✔
182
                self._cycle = cycle(self._clients)
1✔
183
                logger.info("session_pool_rescanned", added=added, total=len(self._clients))
1✔
184
        return added
1✔
185

186
    async def _rescan_loop(self) -> None:
1✔
NEW
187
        while True:
×
NEW
188
            try:
×
NEW
189
                await asyncio.sleep(settings.sessions_rescan_interval)
×
NEW
190
                await self.rescan()
×
NEW
191
            except asyncio.CancelledError:
×
NEW
192
                raise
×
NEW
193
            except Exception:
×
NEW
194
                logger.exception("rescan_loop_error")
×
195

196
    async def reconnect(self, client: TelegramClient) -> bool:
1✔
197
        session_path = self._session_paths.get(client)
×
198
        if not session_path:
×
199
            return False
×
NEW
200
        with contextlib.suppress(Exception):
×
201
            await client.disconnect()
×
202
        try:
×
203
            await client.connect()
×
204
            if await client.is_user_authorized():
×
205
                logger.info("session_reconnected", session=Path(session_path).name)
×
206
                return True
×
207
        except Exception as e:
×
208
            logger.warning("reconnect_failed", session=Path(session_path).name, error=str(e))
×
209
        return False
×
210

211
    async def close(self) -> None:
1✔
212
        if self._rescan_task:
1✔
213
            self._rescan_task.cancel()
1✔
214
            with contextlib.suppress(asyncio.CancelledError, Exception):
1✔
215
                await self._rescan_task
1✔
216
            self._rescan_task = None
1✔
217
        for client in self._clients:
1✔
218
            await client.disconnect()
×
219
        self._clients.clear()
1✔
220
        self._session_paths.clear()
1✔
221
        self._cycle = None
1✔
222

223
    async def get_next(self) -> TelegramClient | None:
1✔
224
        async with self._lock:
1✔
225
            if not self._cycle:
1✔
226
                return None
×
227
            return next(self._cycle)
1✔
228

229
    async def remove_client(self, client: TelegramClient, reason: str = "unknown") -> None:
1✔
230
        async with self._lock:
1✔
231
            if client not in self._clients:
1✔
232
                return
×
233
            session_path = self._session_paths.get(client, "")
1✔
234
            session_name = Path(session_path).name if session_path else "unknown"
1✔
235
            self._clients.remove(client)
1✔
236
            self._session_paths.pop(client, None)
1✔
237
            if self._clients:
1✔
238
                self._cycle = cycle(self._clients)
×
239
            else:
240
                self._cycle = None
1✔
241
            remaining = len(self._clients)
1✔
242
            self._evictions.append(
1✔
243
                {
244
                    "ts": datetime.now(UTC).isoformat(),
245
                    "session": session_name,
246
                    "reason": reason,
247
                }
248
            )
249
        with contextlib.suppress(Exception):
1✔
250
            await client.disconnect()
1✔
251
        logger.warning("session_removed", session=session_name, reason=reason, remaining=remaining)
1✔
252

253
    def status(self) -> dict[str, object]:
1✔
254
        sessions_dir = Path(settings.sessions_dir)
1✔
255
        configured = len(self._discover_session_paths(sessions_dir)) if sessions_dir.exists() else 0
1✔
256
        sessions = []
1✔
257
        for client, path in self._session_paths.items():
1✔
258
            try:
1✔
259
                connected = client.is_connected()
1✔
NEW
260
            except Exception:
×
NEW
261
                connected = False
×
262
            sessions.append({"name": Path(path).name, "connected": bool(connected)})
1✔
263
        return {
1✔
264
            "alive": len(self._clients),
265
            "configured": configured,
266
            "sessions": sessions,
267
            "recent_evictions": list(self._evictions),
268
            "rescan_interval": settings.sessions_rescan_interval,
269
        }
270

271
    @property
1✔
272
    def available(self) -> bool:
1✔
273
        return len(self._clients) > 0
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc