• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

idlesign / torrt / 25475794615

07 May 2026 04:16AM UTC coverage: 60.464% (-7.1%) from 67.593%
25475794615

Pull #95

github

web-flow
Merge dccd57d45 into 5d89e3750
Pull Request #95: Fix tests on Windows

2 of 3 new or added lines in 1 file covered. (66.67%)

119 existing lines in 7 files now uncovered.

1017 of 1682 relevant lines covered (60.46%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.48
/src/torrt/utils.py
1
import base64
3✔
2
import logging
3✔
3
import re
3✔
4
import threading
3✔
5
from collections.abc import Callable, Generator, Mapping
3✔
6
from datetime import UTC, datetime
3✔
7
from inspect import getfullargspec
3✔
8
from json import JSONDecodeError, dump, load
3✔
9
from pathlib import Path
3✔
10
from pkgutil import iter_modules
3✔
11
from time import time
3✔
12
from typing import TYPE_CHECKING, Any, ClassVar, Optional
3✔
13

14
from bs4 import BeautifulSoup
3✔
15
from requests import RequestException, Response, Session
3✔
16
from torrentool.api import Torrent
3✔
17
from torrentool.exceptions import BencodeDecodingError
3✔
18

19
if TYPE_CHECKING:
20
    from .base_bot import BaseBot
21
    from .base_notifier import BaseNotifier
22
    from .base_rpc import BaseRPC
23
    from .base_tracker import GenericTracker
24

25

26
LOGGER = logging.getLogger('torrt')
3✔
27

28
_THREAD_LOCAL = threading.local()
3✔
29

30
# This regex is used to get hyperlink from torrent comment.
31
RE_LINK = re.compile(r'(?P<url>https?://[^\s]+)')
3✔
32

33
DATETIME_FORMAT='%Y-%m-%d %H:%M:%S'
3✔
34

35
class HttpClient:
3✔
36
    """Common client to perform HTTP requests."""
37

38
    timeout: int = 10
3✔
39

40
    user_agent: str = (
3✔
41
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36')
42

43
    def __init__(
3✔
44
            self,
45
            *,
46
            silence_exceptions: bool = False,
47
            dump_fname_tpl: str = '%(ts)s.txt',
48
            json: bool = False,
49
            tunnel: bool = True,
50
    ):
51
        session = Session()
3✔
52

53
        session.headers.update({
3✔
54
            'User-agent': self.user_agent,
55
        })
56

57
        self.session = session
3✔
58
        self.silence_exceptions = silence_exceptions,
3✔
59
        self.dump_fname_tpl = dump_fname_tpl
3✔
60
        self.json = json
3✔
61
        self.last_error: str = ''
3✔
62
        self.last_response: Response | None = None
3✔
63
        self.tunnel = tunnel
3✔
64

65
    def request(
3✔
66
            self,
67
            url: str,
68
            *,
69
            data: dict[str, Any] | None = None,
70
            referer: str = '',
71
            allow_redirects: bool = True,
72
            cookies: dict[str, str] | None = None,
73
            headers: dict[str, str] | None = None,
74
            json: bool | None = None,
75
            silence_exceptions: bool | None = None,
76
            timeout: int = 0,
77
            **kwargs
78
    ) -> Response | dict | None:
79
        """
80

81
        :param url: URL to address
82
        :param data: Data to send to URL
83
        :param referer:
84
        :param allow_redirects:
85
        :param cookies:
86
        :param headers: Additional headers
87
        :param json: Send and receive data as JSON
88
        :param silence_exceptions: Do not raise exceptions
89
        :param timeout: Override timeout.
90
        :param kwargs:
91

92
        """
93
        LOGGER.debug(f'Fetching {url} ...')
3✔
94

95
        headers = {**(headers or {})}
3✔
96

97
        r_kwargs = {
3✔
98
            'timeout': timeout or self.timeout,
99
            'cookies': cookies,
100
            'headers': headers,
101
            'allow_redirects': allow_redirects,
102
            **kwargs,
103
        }
104

105
        if referer:
3✔
106
            headers['Referer'] = referer
3✔
107

108
        if not self.tunnel:
3✔
109
            # Drop globally set tunnels settings. See toolbox.tunnel().
110
            r_kwargs['proxies'] = {'http': None, 'https': None}
3✔
111

112
        if json is None:
3✔
113
            json = self.json
3✔
114

115
        try:
3✔
116

117
            if data or r_kwargs.get('files'):
3✔
118

119
                if json:
3✔
120
                    r_kwargs['json'] = data
3✔
121
                else:
122
                    r_kwargs['data'] = data
3✔
123

124
                method = self.session.post
3✔
125

126
            else:
127
                method = self.session.get
3✔
128

129
            response = method(url, **r_kwargs)
3✔
130

131
            self.last_response = response
3✔
132

133
        except RequestException as e:
3✔
134

135
            self.last_error = f'{e}'
3✔
136
            LOGGER.warning(f"Failed to get response from `{url}`: {e}")
3✔
137

138
            if silence_exceptions is None:
3✔
139
                silence_exceptions = self.silence_exceptions
×
140

141
            if silence_exceptions:
3✔
142
                return None
3✔
143

144
            raise
×
145

146
        else:
147

148
            dump_contents(
3✔
149
                self.dump_fname_tpl,
150
                contents=response.content
151
            )
152

153
            if json:
3✔
154
                try:
3✔
155
                    response = response.json()
3✔
156

157
                except JSONDecodeError:
3✔
158
                    return {}
3✔
159

160
        return response
3✔
161

162

163
def encode_value(value: str, *, encoding: str = "") -> str | bytes:
3✔
164
    """Encodes a value.
165

166
    :param value:
167
    :param encoding: Encoding charset.
168

169
    """
170
    if not encoding:
×
171
        return value
×
172

173
    return value.encode(encoding)
×
174

175

176
def base64encode(string_or_bytes: str | bytes) -> bytes:
3✔
177
    """Return base64 encoded input
178

179
    :param string_or_bytes:
180

181
    """
182
    if isinstance(string_or_bytes, str):
3✔
183
        string_or_bytes = string_or_bytes.encode()
3✔
184

185
    return base64.encodebytes(string_or_bytes).decode('ascii').encode()
3✔
186

187

188
class GlobalParam:
3✔
189
    """Represents global parameter value holder.
190
    Global params can used anywhere in torrt.
191

192
    """
193
    @staticmethod
3✔
194
    def set(name: str, value: Any):
3✔
195
        setattr(_THREAD_LOCAL, name, value)
3✔
196

197
    @staticmethod
3✔
198
    def get(name: str) -> Any:
3✔
199
        return getattr(_THREAD_LOCAL, name, None)
3✔
200

201

202
def dump_contents(filename: str, contents: bytes):
3✔
203
    """Dumps contents into a file with a given name.
204

205
    :param filename:
206
    :param contents:
207

208
    """
209
    dump_into = GlobalParam.get('dump_into')
3✔
210

211
    if not dump_into:
3✔
212
        return
3✔
213

214
    filename = filename % {
3✔
215
        'ts': time(),
216
    }
217

218
    with (Path(dump_into) / filename).open('wb') as f:
3✔
UNCOV
219
        f.write(contents)
×
220

221

222
def configure_entity(
3✔
223
        type_name: str,
224
        registry,
225
        alias: str,
226
        settings_dict: dict[str, Any] | None = None,
227
        *,
228
        before_save: Callable | None = None
229
) -> Optional['WithSettings']:
230
    """Configures and spawns objects using given settings.
231

232
    Successful configuration is saved.
233

234
    :param type_name: Entity type name to be used in massages.
235

236
    :param registry: Registry object.
237

238
    :param alias: Entity alias.
239

240
    :param settings_dict: Settings dictionary to configure object with.
241

242
    :param before_save: Function to trigger right before configuration is saved.
243
        Should accept entity object as argument.
244

245
    """
246
    LOGGER.info(f'Configuring `{alias}` {type_name.lower()} ...')
3✔
247

248
    entity_cls = registry.get(alias)
3✔
249

250
    if entity_cls is not None:
3✔
251

252
        obj = entity_cls.spawn_with_settings(settings_dict or {})
3✔
253
        configured = obj.test_configuration()
3✔
254

255
        if configured:
3✔
256
            before_save and before_save(obj)
3✔
257
            obj.save_settings()
3✔
258
            LOGGER.info(f'{type_name} `{alias}` is configured')
3✔
259

260
            return obj
3✔
261

262
        else:
263
            LOGGER.error(f'{type_name} `{alias}` configuration failed. Check your settings')
×
264

265
    else:
266
        LOGGER.error(f'{type_name} `{alias}` is unknown')
3✔
267

268

269
def import_classes():
3✔
270
    """Dynamically imports RPC classes and tracker handlers from their directories."""
271

272
    for package_name in ('rpc', 'trackers', 'notifiers', 'bots'):
3✔
273
        LOGGER.debug(f'Importing {package_name} ...')
3✔
274
        import_from_path(package_name)
3✔
275

276

277
def import_from_path(path: str):
3✔
278
    """Dynamically imports modules from package.
279
    It is an .egg-friendly alternative to os.listdir() walking.
280

281
    :param path: path under torrt
282

283
    """
284
    for _, pname, _ in iter_modules([f'{Path(__file__).parent / path}']):
3✔
285
        __import__(f'torrt.{path}.{pname}')
3✔
286

287

288
def parse_torrent(torrent: bytes) -> Torrent | None:
3✔
289
    """Returns Torrent object from torrent contents.
290

291
    :param torrent: Torrent file contents.
292

293
    """
UNCOV
294
    try:
×
UNCOV
295
        return Torrent.from_string(torrent)
×
296

297
    except BencodeDecodingError as e:
×
298
        LOGGER.error(f'Unable to parse torrent: {e}')
×
299
        return None
×
300

301

302
def make_soup(html: str) -> BeautifulSoup:
3✔
303
    """Returns BeautifulSoup object from a html.
304

305
    :param html:
306

307
    """
UNCOV
308
    return BeautifulSoup(html, 'lxml')
×
309

310

311
def get_url_from_string(string: str) -> str:
3✔
312
    """Returns URL from a string, e.g. torrent comment.
313

314
    :param string:
315

316
    """
UNCOV
317
    match = RE_LINK.search(string)
×
318

UNCOV
319
    try:
×
UNCOV
320
        match = match.group('url')
×
321

322
    except AttributeError:
×
323
        match = ''
×
324

UNCOV
325
    return match
×
326

327

328
def get_iso_from_timestamp(ts: int) -> str:
3✔
329
    """Get ISO formatted string from timestamp.
330

331
    :param ts: timestamp
332

333
    """
334
    return datetime.fromtimestamp(ts, tz=UTC).isoformat(' ')
×
335

336

337
def update_dict(old_dict: dict, new_dict: dict) -> dict:
3✔
338
    """Updates [inplace] old dictionary with data from a new one with respect to existing values.
339

340
    :param old_dict:
341
    :param new_dict:
342

343
    """
344
    for key, val in new_dict.items():
3✔
345

346
        if isinstance(val, Mapping):
3✔
347
            old_dict[key] = update_dict(old_dict.get(key, {}), val)
3✔
348

349
        else:
350
            old_dict[key] = new_dict[key]
3✔
351

352
    return old_dict
3✔
353

354

355
class PageData:
3✔
356
    """Represents data extracted from torrent page."""
357

358
    def __init__(self, title: str, cover: str, date_updated: datetime):
3✔
359
        self.title = title
3✔
360
        self.cover = cover
3✔
361
        self.date_updated = date_updated
3✔
362

363
    def to_dict(self):
3✔
UNCOV
364
        data = {
×
365
            'title': self.title,
366
            'cover': self.cover,
367
            'date_updated': self.date_updated.strftime(DATETIME_FORMAT) if self.date_updated else None
368
        }
UNCOV
369
        return data
×
370

371

372
class TorrentData:
3✔
373
    """Represents information about torrent."""
374

375
    def __init__(
3✔
376
            self,
377
            *,
378
            hash: str = '',
379
            name: str = '',
380
            url: str = '',
381
            url_file: str = '',
382
            raw: bytes = b'',
383
            page: PageData = None,
384
            parsed: Torrent = None,
385
    ):
386
        self.url = url
3✔
387
        self.url_file = url_file
3✔
388

389
        self.raw = raw
3✔
390
        self.parsed = parsed
3✔
391
        self.page = page
3✔
392
        self.params = {}
3✔
393

394
        self._name = name
3✔
395
        self._hash = hash
3✔
396

397
    def _get_hash(self):
3✔
398
        return self._hash or getattr(self.parsed, 'info_hash', '') or ''
3✔
399

400
    def _set_hash(self, val: str):
3✔
401
        self._hash = val
3✔
402

403
    def _get_name(self):
3✔
404
        return self._name or getattr(self.parsed, 'name', '') or ''
3✔
405

406
    def _set_name(self, val: str):
3✔
407
        self._name = val
×
408

409
    hash: str = property(_get_hash, _set_hash)
3✔
410
    name: str = property(_get_name, _set_name)
3✔
411

412
    def set_params(self, params: dict | None = None) -> dict | None:
3✔
413
        self.params = params or {}
3✔
414

415
    def to_dict(self) -> dict:
3✔
416
        page = self.page
3✔
417

418
        result = {
3✔
419
            'hash': self.hash,
420
            'name': self.name,
421
            'url': self.url,
422
            'url_file': self.url_file,
423
            'page': page.to_dict() if page else {},
424
            'params': self.params,
425
        }
426
        return result
3✔
427

428

429
def structure_torrent_data(target_dict: dict, hash_str: str, data: TorrentData):
3✔
430
    """Updated target dict with torrent data structured suitably
431
    for config storage.
432

433
    :param target_dict: dictionary to update inplace
434
    :param hash_str: torrent identifying hash
435
    :param data: torrent data (e.g. from tracker page or received from RPC (see parse_torrent()))
436

437
    """
438

439
    if not data.hash:
3✔
440
        data.hash = hash_str
3✔
441

442
    target_dict[hash_str] = data.to_dict()
3✔
443

444

445
def get_torrent_from_url(url: str | None, last_updated: datetime | None = None) -> TorrentData | None:
3✔
446
    """Downloads torrent from a given URL and returns torrent data.
447

448
    :param url: URL to download torrent file from
449
    :param last_updated: torrent last updated datetime
450

451
    """
452
    LOGGER.debug(f'Downloading torrent file from `{url}` ...')
3✔
453

454
    tracker: GenericTracker = TrackerObjectsRegistry.get_for_string(url)
3✔
455

456
    if tracker:
3✔
457
        torrent_info = tracker.get_torrent(url, last_updated=last_updated)
3✔
458

UNCOV
459
        if torrent_info is None:
×
460
            LOGGER.warning(f'Unable to get torrent from `{url}`')
×
461

462
        else:
UNCOV
463
            LOGGER.debug(f'Torrent was downloaded from `{url}`')
×
UNCOV
464
            return torrent_info
×
465

466
    else:
467
        LOGGER.warning(f'Tracker handler for `{url}` is not registered')
3✔
468

469
    return None
3✔
470

471

472
def iter_rpc() -> Generator[tuple[str, 'BaseRPC'], None, None]:
3✔
473
    """Generator to iterate through available and enable RPC objects.
474
        tuple - rpc_alias, rpc_object
475

476
    """
477
    rpc_objects = RPCObjectsRegistry.get()
3✔
478

479
    if not rpc_objects:
3✔
480
        LOGGER.error('No RPC objects registered, unable to proceed')
3✔
481
        return
3✔
482

UNCOV
483
    for rpc_alias, rpc_object in rpc_objects.items():
×
484

UNCOV
485
        if not rpc_object.enabled:
×
486
            LOGGER.debug(f'RPC `{rpc_object.alias}` is disabled, skipped.')
×
487
            continue
×
488

UNCOV
489
        yield rpc_alias, rpc_object
×
490

491

492
def iter_bots() -> Generator[tuple[str, 'BaseBot'], None, None]:
3✔
493
    """Generator to iterate through available bots objects.
494
        tuple - bot_alias, bot_object
495

496
    """
497
    bot_objects = BotObjectsRegistry.get()
3✔
498

499
    if not bot_objects:
3✔
500
        LOGGER.error('No Bot objects registered, unable to proceed')
3✔
501
        return
3✔
502

503
    yield from bot_objects.items()
3✔
504

505

506
def iter_notifiers() -> Generator[tuple[str, 'BaseNotifier'], None, None]:
3✔
507
    """Generator to iterate through available notifier objects.
508
        tuple - notifier_alias, notifier_object
509

510
    """
UNCOV
511
    notifier_objects = NotifierObjectsRegistry.get()
×
512

UNCOV
513
    if not notifier_objects:
×
UNCOV
514
        LOGGER.debug('No Notifier registered. Notification skipped')
×
UNCOV
515
        return
×
516

517
    yield from notifier_objects.items()
×
518

519

520
class WithSettings:
3✔
521
    """Introduces settings support for class objects.
522

523
    NB: * Settings names are taken from inheriting classes __init__() methods.
524
        * __init__() method MUST use keyword arguments only.
525
        * Inheriting classes MUST save settings under object properties with the same name as in __init__().
526

527
    """
528
    alias: str = None
3✔
529

530
    config_entry_name: str = None
3✔
531
    settings: ClassVar[dict[str, Any]] = {}
3✔
532

533
    def __init__(self, **kwargs):
3✔
534
        pass
3✔
535

536
    def __str__(self) -> str:
3✔
537
        return self.alias
3✔
538

539
    @classmethod
3✔
540
    def spawn_with_settings(cls, settings: dict) -> 'WithSettings':
3✔
541
        """Spawns and returns object initialized with given settings.
542

543
        :param settings:
544

545
        """
546
        LOGGER.debug(f'Spawning `{cls.__name__}` object with the given settings ...')
3✔
547

548
        return cls(**settings)
3✔
549

550
    @classmethod
3✔
551
    def log_debug(cls, msg: str):
3✔
552
        """Sends the message to debug log.
553

554
        :param msg:
555

556
        """
557
        LOGGER.debug(f'{cls.__name__}: {msg}')
3✔
558

559
    @classmethod
3✔
560
    def log_info(cls, msg: str):
3✔
561
        """Sends the message to info log.
562

563
        :param msg:
564

565
        """
566
        LOGGER.info(f'{cls.__name__}: {msg}')
×
567

568
    @classmethod
3✔
569
    def log_warning(cls, msg: str):
3✔
570
        """Sends the message to warning log.
571

572
        :param msg:
573

574
        """
575
        LOGGER.warning(f'{cls.__name__}: {msg}')
×
576

577
    @classmethod
3✔
578
    def log_error(cls, msg: str):
3✔
579
        """Sends the message to error log.
580

581
        :param msg:
582

583
        """
584
        LOGGER.error(f'{cls.__name__}: {msg}')
×
585

586
    def save_settings(self):
3✔
587
        """Saves object settings into torrt configuration file."""
588

589
        settings = {}
3✔
590

591
        try:
3✔
592
            settings_names = getfullargspec(self.__init__)[0]
3✔
593

594
            del settings_names[0]  # do not need `self`
3✔
595

596
            for name in settings_names:
3✔
597
                settings[name] = getattr(self, name)
3✔
598

599
        except TypeError:
×
600
            pass  # Probably __init__ method is not user-defined.
×
601

602
        config.update({self.config_entry_name: {self.alias: settings}})
3✔
603

604

605
class TorrtConfig:
3✔
606
    """Gives methods to work with torrt configuration file."""
607

608
    USER_DATA_PATH = Path('~').expanduser() / '.torrt'
3✔
609
    USER_SETTINGS_FILE = USER_DATA_PATH / 'config.json'
3✔
610

611
    _basic_settings: ClassVar[dict[str, Any]] = {
3✔
612
        'time_last_check': 0,
613
        'walk_interval_hours': 1,
614
        'rpc': {},
615
        'trackers': {},
616
        'torrents': {},
617
        'notifiers': {},
618
        'bots': {}
619
    }
620

621
    @classmethod
3✔
622
    def drop_section(cls, realm: str, key: str):
3✔
623
        """Drops config section by its key (name) and updates config.
624

625
        :param realm:
626
        :param key:
627

628
        """
629
        try:
3✔
630
            cfg = cls.load()
3✔
631
            del cfg[realm][key]
3✔
632
            cls.save(cfg)
3✔
633

634
        except KeyError:
3✔
635
            pass
3✔
636

637
    @classmethod
3✔
638
    def bootstrap(cls):
3✔
639
        """Initializes configuration file if needed."""
640

641
        if not cls.USER_DATA_PATH.exists():
3✔
642
            cls.USER_DATA_PATH.mkdir(parents=True)
3✔
643

644
        if not cls.USER_SETTINGS_FILE.exists():
3✔
645
            cls.save(cls._basic_settings)
3✔
646

647
        # My precious.
648
        cls.USER_SETTINGS_FILE.chmod(0o600)
3✔
649

650
    @classmethod
3✔
651
    def update(cls, settings_dict: dict):
3✔
652
        """Updates configuration file with given settings.
653

654
        :param settings_dict:
655

656
        """
657
        cls.save(update_dict(cls.load(), settings_dict))
3✔
658

659
    @classmethod
3✔
660
    def load(cls) -> dict:
3✔
661
        """Returns current settings dictionary."""
662

663
        LOGGER.debug(f'Loading configuration file {cls.USER_SETTINGS_FILE} ...')
3✔
664

665
        cls.bootstrap()
3✔
666

667
        with cls.USER_SETTINGS_FILE.open() as f:
3✔
668
            settings = load(f)
3✔
669

670
        # Pick up settings entries added in new version
671
        # and put them into old user config.
672
        for key, val in cls._basic_settings.items():
3✔
673
            if key not in settings:
3✔
674
                settings[key] = val
×
675

676
        return settings
3✔
677

678
    @classmethod
3✔
679
    def save(cls, settings_dict: dict):
3✔
680
        """Saves a given dict as torrt configuration.
681

682
        :param settings_dict:
683

684
        """
685
        LOGGER.debug(f'Saving configuration file {cls.USER_SETTINGS_FILE} ...')
3✔
686

687
        with cls.USER_SETTINGS_FILE.open('w') as f:
3✔
688
            dump(settings_dict, f, indent=4)
3✔
689

690

691
config = TorrtConfig
3✔
692

693

694
class ObjectsRegistry:
3✔
695

696
    __slots__ = ['_items']
3✔
697

698
    def __init__(self):
3✔
699
        self._items: dict[str, Any] = {}
3✔
700

701
    def add(self, obj: Any):
3✔
702
        """Add an object to registry.
703

704
        NB: object MUST have `alias` attribute.
705

706
        :param obj:
707

708
        """
709
        name = obj.alias
3✔
710

711
        LOGGER.debug(f'Registering `{name}` from {obj} ...')
3✔
712

713
        self._items[name] = obj
3✔
714

715
    def get(self, obj_alias: str | None = None) -> dict[str, Any] | Any:
3✔
716
        """Returns registered objects or a definite object by its alias,
717
        or registry items if no alias provided.
718

719
        :param obj_alias:
720

721
        """
722
        if obj_alias is None:
3✔
723
            return self._items
3✔
724

725
        return self._items.get(obj_alias)
3✔
726

727
    def get_for_string(self, string: str) -> Any | None:
3✔
728
        """Returns registered object which can handle a given string.
729

730
        :param string:
731

732
        """
733
        for name, obj in self._items.items():
3✔
734
            can_handle_method = getattr(obj, 'can_handle', None)
3✔
735

736
            if can_handle_method and can_handle_method(string):
3✔
737
                return obj
3✔
738

739
            elif name in string:
3✔
740
                return obj
×
741

742
        return None
3✔
743

744

745
RPCClassesRegistry = ObjectsRegistry()
3✔
746
RPCObjectsRegistry = ObjectsRegistry()
3✔
747
TrackerClassesRegistry = ObjectsRegistry()
3✔
748
TrackerObjectsRegistry = ObjectsRegistry()
3✔
749
NotifierClassesRegistry = ObjectsRegistry()
3✔
750
NotifierObjectsRegistry = ObjectsRegistry()
3✔
751
BotClassesRegistry = ObjectsRegistry()
3✔
752
BotObjectsRegistry = ObjectsRegistry()
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc