• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

AntiCompositeNumber / iNaturalistReviewer / 13210719831

08 Feb 2025 01:06AM UTC coverage: 62.381% (-0.6%) from 62.943%
13210719831

push

github

AntiCompositeNumber
inrbot: Add exponential backoff for PetScan errors

Also bumps petscan failures down to warning level.

192 of 354 branches covered (54.24%)

Branch coverage included in aggregate %.

15 of 31 new or added lines in 1 file covered. (48.39%)

1 existing line in 1 file now uncovered.

657 of 1007 relevant lines covered (65.24%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.25
/src/inrbot.py
1
#!/usr/bin/env python3
2
# coding: utf-8
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
# Copyright 2023 AntiCompositeNumber
5

6
import argparse
1✔
7
import datetime
1✔
8
import hashlib
1✔
9
import itertools
1✔
10
import json
1✔
11
import logging
1✔
12
import logging.config
1✔
13
import os
1✔
14
import re
1✔
15
import string
1✔
16
import time
1✔
17
import urllib.parse
1✔
18
from hmac import compare_digest
1✔
19
from io import BytesIO
1✔
20
from pathlib import Path
1✔
21

22
import imagehash  # type: ignore
1✔
23
import mwparserfromhell as mwph  # type: ignore
1✔
24
import pywikibot  # type: ignore
1✔
25
import pywikibot.pagegenerators as pagegenerators  # type: ignore
1✔
26
import requests
1✔
27
import PIL.Image  # type: ignore
1✔
28
import waybackpy
1✔
29

30
from typing import NamedTuple, Optional, Set, Tuple, Dict, Union, cast, Callable, List
1✔
31
from typing import Any, Iterator
1✔
32

33
import acnutils
1✔
34

35
__version__ = "2.6.0"
1✔
36

37
logger = acnutils.getInitLogger("inrbot", level="VERBOSE", filename="inrbot.log")
1✔
38

39
site = pywikibot.Site("commons", "commons")
1✔
40
username = site.username()
1✔
41
summary_tag = f"(inrbot {__version__})"
1✔
42
user_agent = (
1✔
43
    f"Bot iNaturalistReviewer/{__version__} "
44
    "on Wikimedia Toolforge "
45
    f"(Contact: https://commons.wikimedia.org/wiki/User:{username}; "
46
    "https://www.inaturalist.org/people/anticompositenumber "
47
    "tools.inaturalistreviewer@tools.wmflabs.org) "
48
    f"Python requests/{requests.__version__}"
49
)
50

51
session = requests.Session()
1✔
52
session.headers.update({"user-agent": user_agent})
1✔
53
recent_bytes = {}
1✔
54
conf_ts = None
1✔
55

56
compare_methods: List[Tuple[str, Callable]] = []
1✔
57
pre_save_hooks: List[Callable] = []
1✔
58
id_hooks: List[Callable] = []
1✔
59
status_hooks: List[Callable] = []
1✔
60
lock_hooks: List[Callable] = []
1✔
61

62

63
class iNaturalistID(NamedTuple):
1✔
64
    id: str
1✔
65
    type: str
1✔
66
    url: str = ""
1✔
67

68
    def __str__(self):
1✔
69
        return f"https://www.inaturalist.org/{self.type}/{self.id}"
1✔
70

71
    def __eq__(self, other):
1✔
72
        if isinstance(other, iNaturalistID):
1!
73
            return self.id == other.id and self.type == other.type
1✔
74
        elif isinstance(other, iNaturalistImage):
×
75
            return self.id == other.id.id and self.type == other.id.type
×
76
        else:
77
            return NotImplemented
×
78

79

80
class RestartBot(RuntimeError):
1✔
81
    pass
1✔
82

83

84
class ProcessingError(Exception):
1✔
85
    def __init__(self, reason_code: str, description: str = ""):
1✔
86
        self.reason_code = reason_code
1✔
87
        self.description = description
1✔
88

89

90
class StopReview(Exception):
1✔
91
    def __init__(self, reason: str):
1✔
92
        self.reason = reason
1✔
93

94

95
class ExponentialRateLimit:
1✔
96
    """Provide an exponential backoff based on calls to failure()
97

98
    Unlike acnutils.Throttle, this throttle is not self-enforcing.
99
    Instead, call should_run() to determine if a request to a flaky service
100
    should be made.
101
    """
102

103
    def __init__(
1✔
104
        self, interval_seconds: int, base: int, max_fails: int = 0, log_name: str = ""
105
    ) -> None:
106
        """
107
        :param interval_seconds: Delay interval, in seconds.
108
        :param base: Base of exponential delay
109
        :param max_fails: Number of failures beyond which delay should not increase.
110
            Set to 0 for no maximum.
111
        """
112
        self.max_fails = max_fails
1✔
113
        self.interval = interval_seconds
1✔
114
        self.base = base
1✔
115
        self.fails = 0
1✔
116
        self.max_fails = max_fails
1✔
117
        self.last_request = 0.0
1✔
118
        self.log_name = log_name
1✔
119

120
    def success(self) -> None:
1✔
NEW
121
        if self.fails >= 0:
×
NEW
122
            self.fails = self.fails - 1
×
NEW
123
        self.last_request = time.monotonic()
×
124

125
    def failure(self) -> None:
1✔
NEW
126
        if self.max_fails == 0 or self.fails < self.max_fails:
×
NEW
127
            self.fails = self.fails + 1
×
NEW
128
        elif self.log_name:
×
NEW
129
            logger.error(f"{self.log_name}: Maximum failures exceeded")
×
130

NEW
131
        self.last_request = time.monotonic()
×
132

133
    def backoff_seconds(self) -> int:
1✔
NEW
134
        return self.interval * (self.base**self.fails)
×
135

136
    def should_run(self) -> bool:
1✔
NEW
137
        if self.fails == 0:
×
NEW
138
            return True
×
139

NEW
140
        return self.last_request + self.backoff_seconds() <= time.monotonic()
×
141

142

143
petscan_backoff = ExponentialRateLimit(
1✔
144
    interval_seconds=60, base=6, max_fails=4, log_name="PetScan backoff"
145
)
146

147

148
def get_config() -> Tuple[dict, datetime.datetime]:
1✔
149
    """Load on-wiki configuration"""
150
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
151
    conf_json = json.loads(page.text)
1✔
152
    logger.info(f"Loaded config from {page.title(as_link=True)}")
1✔
153
    logger.debug(json.dumps(conf_json, indent=2))
1✔
154
    ts = datetime.datetime.utcnow()
1✔
155
    return conf_json, ts
1✔
156

157

158
def check_config() -> None:
1✔
159
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
160
    if conf_ts and page.latest_revision.timestamp > conf_ts:
1!
161
        raise RestartBot("Configuration has been updated, bot will restart")
×
162

163

164
def init_compare_methods() -> None:
1✔
165
    global compare_methods
166
    compare_methods = []
1✔
167
    if "sha1" in config["compare_methods"]:
1✔
168
        compare_methods.append(("sha1", compare_sha1))
1✔
169
    if "phash" in config["compare_methods"]:
1✔
170
        compare_methods.append(("phash", compare_phash))
1✔
171

172

173
def files_to_check(start: Optional[str] = None) -> Iterator[pywikibot.page.BasePage]:
1✔
174
    """Iterate list of files needing review from Commons"""
175
    category = pywikibot.Category(site, "Category:INaturalist review needed")
×
176
    do_heartbeat()
×
177
    for page in pagegenerators.CategorizedPageGenerator(
×
178
        category, namespaces=6, start=start
179
    ):
180
        yield page
×
181

182

183
def untagged_files_to_check() -> Iterator[pywikibot.page.BasePage]:
1✔
NEW
184
    if not (config.get("find_untagged") and petscan_backoff.should_run()):
×
185
        pages = []
×
186
    else:
187
        try:
×
188
            res = session.get(
×
189
                config["petscan_url"], params=config["untagged_petscan_query"]
190
            )
191
            res.raise_for_status()
×
192

193
            data = res.json()
×
194
            assert data["n"] == "result"
×
195
            pages = data["*"][0]["a"]["*"]
×
NEW
196
            petscan_backoff.success()
×
197
        except Exception as err:
×
NEW
198
            logger.warning(err)
×
199
            pages = []
×
NEW
200
            petscan_backoff.failure()
×
UNCOV
201
        logger.info(f"Found {len(pages)} untagged files to check")
×
202

203
    # Whether we get data from PetScan is unrelated to the health of inrbot
204
    do_heartbeat()
×
205
    for page_data in pages:
×
206
        yield pywikibot.FilePage(site, title=page_data["title"])
×
207

208

209
def gbif_to_ina_url(url: urllib.parse.ParseResult) -> str:
1✔
210
    path = url.path.split(sep="/")
1✔
211
    if path[1] != "occurrence":
1!
212
        return ""
×
213
    api_url = f"https://api.gbif.org/v1/occurrence/{path[2]}"
1✔
214
    res = session.get(api_url)
1✔
215
    res.raise_for_status()
1✔
216
    return res.json().get("references", "")
1✔
217

218

219
def parse_ina_url(raw_url: str) -> Optional[iNaturalistID]:
1✔
220
    """Parses an iNaturalist URL into an iNaturalistID named tuple"""
221
    url = urllib.parse.urlparse(raw_url)
1✔
222
    path = url.path.split(sep="/")
1✔
223
    netloc = url.netloc.lower()
1✔
224
    if len(path) == 3 and any(
1✔
225
        netloc.endswith(domain) for domain in config["inaturalist_domains"]
226
    ):
227
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
228
    elif len(path) == 4 and netloc in (
1✔
229
        "inaturalist-open-data.s3.amazonaws.com",
230
        "static.inaturalist.org",
231
    ):
232
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
233
    elif len(path) == 3 and netloc == "www.gbif.org":
1✔
234
        return parse_ina_url(gbif_to_ina_url(url))
1✔
235
    else:
236
        return None
1✔
237

238

239
class Image:
1✔
240
    def __init__(
1✔
241
        self,
242
        raw: Optional[bytes] = None,
243
        image: Optional[PIL.Image.Image] = None,
244
        sha1: str = "",
245
        phash: Optional[imagehash.ImageHash] = None,
246
    ):
247
        self._raw = raw
1✔
248
        self._image = image
1✔
249
        self._sha1 = sha1
1✔
250
        self._phash = phash
1✔
251

252
    @property
1✔
253
    def phash(self) -> imagehash.ImageHash:
1✔
254
        if not self._phash:
1!
255
            self._phash = imagehash.phash(self.image)
1✔
256
        return self._phash
1✔
257

258
    @property
1✔
259
    def image(self):
1✔
260
        raise NotImplementedError
×
261

262
    def __repr__(self) -> str:
1✔
263
        paras = ", ".join(
×
264
            f"{key}={repr(value)}" for key, value in self.__dict__.items()
265
        )
266
        return f"{type(self).__name__}({paras})"
×
267

268
    def __eq__(self, other):
1✔
269
        if isinstance(other, Image):
1!
270
            return self.id == other.id
×
271
        elif isinstance(other, iNaturalistID):
1!
272
            return self.id == other
1✔
273
        else:
274
            return NotImplemented
×
275

276

277
class iNaturalistImage(Image):
1✔
278
    _cache: Dict[iNaturalistID, str] = {}
1✔
279

280
    def __init__(self, id: iNaturalistID, **kwargs):
1✔
281
        self.id = id
1✔
282
        super().__init__(**kwargs)
1✔
283

284
    @property
1✔
285
    def raw(self) -> bytes:
1✔
286
        if not self._raw:
1!
287
            self._raw = acnutils.retry(get_ina_image, 3, photo=self.id)
1✔
288
        return cast(bytes, self._raw)
1✔
289

290
    @property
1✔
291
    def image(self) -> PIL.Image.Image:
1✔
292
        if not self._image:
1!
293
            self._image = PIL.Image.open(BytesIO(self.raw))
1✔
294
        return self._image
1✔
295

296
    @property
1✔
297
    def sha1(self) -> str:
1✔
298
        if not self._sha1:
1✔
299
            if self.id in self._cache:
1✔
300
                self._sha1 = self._cache[self.id]
1✔
301
            else:
302
                sha1sum = hashlib.sha1()
1✔
303
                sha1sum.update(self.raw)
1✔
304
                self._sha1 = sha1sum.hexdigest()
1✔
305
                self._cache[self.id] = self._sha1
1✔
306
        return self._sha1
1✔
307

308

309
class CommonsImage(Image):
1✔
310
    def __init__(self, page: pywikibot.FilePage, **kwargs):
1✔
311
        self.page = page
1✔
312
        if self.page:
1✔
313
            self.page.get(force=True)
1✔
314
        super().__init__(**kwargs)
1✔
315

316
    @property
1✔
317
    def raw(self):
1✔
318
        return NotImplemented
×
319

320
    @property
1✔
321
    def image(self) -> PIL.Image.Image:
1✔
322
        """Download orignal Commons file and open as a PIL image"""
323
        if not self._image:
1!
324
            url = self.page.get_file_url()
1✔
325
            response = session.get(url)
1✔
326
            response.raise_for_status()
1✔
327
            self._image = PIL.Image.open(BytesIO(response.content))
1✔
328
        return self._image
1✔
329

330
    @property
1✔
331
    def sha1(self) -> str:
1✔
332
        if not self._sha1:
1✔
333
            self._sha1 = self.page.latest_file_info.sha1
1✔
334
        return self._sha1
1✔
335

336

337
def compare_sha1(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
338
    logger.debug(f"Commons sha1sum:     {com_img.sha1}")
1✔
339
    logger.debug(f"iNaturalist sha1sum: {ina_img.sha1}")
1✔
340
    return compare_digest(com_img.sha1, ina_img.sha1)
1✔
341

342

343
def compare_phash(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
344
    diff = com_img.phash - ina_img.phash
1✔
345
    logger.debug(f"PHash Hamming distance: {diff}")
1✔
346
    return diff <= config.get("max_phash_dist", 4)
1✔
347

348

349
def get_ina_image(photo: iNaturalistID, final: bool = False) -> bytes:
1✔
350
    """Download original photo from iNaturalist"""
351
    if photo.url:
1!
352
        extension = photo.url.partition("?")[0].rpartition(".")[2]
1✔
353
        domain = photo.url.partition("//")[2].partition("/")[0]
1✔
354
    else:
355
        extension = "jpeg"
×
356
        domain = "inaturalist-open-data.s3.amazonaws.com"
×
357
    # TODO: Replace this hardcoded URL
358
    url = f"https://{domain}/photos/{photo.id}/original.{extension}"
1✔
359
    response = session.get(url)
1✔
360
    if response.status_code == 403 and not final:
1!
361
        return get_ina_image(photo._replace(url=url.replace("jpeg", "jpg")), final=True)
×
362
    response.raise_for_status()
1✔
363
    return response.content
1✔
364

365

366
def bytes_throttle(length: int) -> None:
1✔
367
    hour_limit = 4.5e9
×
368
    day_limit = 23.5e9
×
369
    global recent_bytes
370
    logger.debug(f"Content length: {length}")
×
371
    now = datetime.datetime.now()
×
372
    recent_bytes[datetime.datetime.now()] = length
×
373

374
    last_hour = 0
×
375
    last_day = 0
×
376
    for date, val in recent_bytes.copy().items():
×
377
        if now - date <= datetime.timedelta(hours=24):
×
378
            last_day += val
×
379
            if now - date <= datetime.timedelta(hours=1):
×
380
                last_hour += val
×
381
        else:
382
            del recent_bytes[date]
×
383

384
    logger.debug(f"Hour total: {last_hour}, day total: {last_day}")
×
385
    if last_day >= day_limit:
×
386
        logger.error(
×
387
            f"{last_day} bytes transferred in last 24h, approaching iNaturalist limits!"
388
        )
389
        sleep_time = 3600 * 12  # 12 hours
×
390
    elif last_hour >= hour_limit:
×
391
        logger.error(
×
392
            f"{last_hour} bytes transferred in last hour, "
393
            "approaching iNaturalist limits!"
394
        )
395
        sleep_time = 60 * 30  # 30 minutes
×
396
    else:
397
        return None
×
398
    logger.info(f"Sleeping for {sleep_time} seconds")
×
399
    time.sleep(sleep_time)
×
400
    return None
×
401

402

403
def do_heartbeat() -> None:
1✔
404
    """Update the timestamp on a file (if provided)
405

406
    Works with inrbot-healthcheck.sh when the HEARTBEAT_FILE environment variable is set
407
    """
408
    if os.environ.get("HEARTBEAT_FILE"):
1!
409
        Path(os.environ["HEARTBEAT_FILE"]).touch()
×
410

411

412
class Aliases:
1✔
413
    alias_cache: Dict[str, Dict[str, Union[float, Set[str]]]] = {}
1✔
414

415
    def __init__(self, title: str) -> None:
1✔
416
        self.title: str = title
1✔
417
        self._aliases: Optional[Set[str]] = None
1✔
418

419
    def get_aliases(self) -> None:
1✔
420
        canon_page = pywikibot.Page(site, f"Template:{self.title}")
1✔
421
        aliases = {
1✔
422
            page.title(with_ns=False).lower()
423
            for page in canon_page.backlinks(filter_redirects=True, namespaces=10)
424
        }
425
        aliases.add(canon_page.title(with_ns=False).lower())
1✔
426
        aliases.update(
1✔
427
            page.title(with_ns=False).lower().partition("/")[0]
428
            for page in canon_page.embeddedin(namespaces=10)
429
        )
430
        self._aliases = aliases
1✔
431

432
    @property
1✔
433
    def aliases(self):
1✔
434
        if self._aliases is None:
1✔
435
            cached = self.alias_cache.get(self.title)
1✔
436
            if cached is None or time.monotonic() - cached["last_update"] > 3600:
1✔
437
                self.get_aliases()
1✔
438
                self.alias_cache[self.title] = {
1✔
439
                    "last_update": time.monotonic(),
440
                    "aliases": self._aliases,
441
                }
442
            else:
443
                self._aliases = cached["aliases"]
1✔
444
        return self._aliases
1✔
445

446
    def is_license(self, template: mwph.nodes.Template) -> bool:
1✔
447
        if template.name.lower() in self.aliases:
1✔
448
            return True
1✔
449
        elif template.name.lower() == "self":
1!
450
            return True
×
451
        return False
1✔
452

453

454
def get_observation_from_photo(photo_id: iNaturalistID) -> iNaturalistID:
1✔
455
    assert photo_id.type == "photos"
1✔
456
    try:
1✔
457
        res = session.get(str(photo_id))
1✔
458
        res.raise_for_status()
1✔
459
    except Exception:
×
460
        raise ProcessingError("nourl", "No observation ID could be found")
×
461
    # Yes, I know I'm parsing HTML with a regex.
462
    match = re.search(r"/observations/(\d*)\"", res.text)
1✔
463
    if not match:
1!
464
        raise ProcessingError("nourl", "No observation ID could be found")
×
465
    else:
466
        return iNaturalistID(type="observations", id=match.group(1))
1✔
467

468

469
class CommonsPage:
1✔
470
    def __init__(
1✔
471
        self,
472
        page: pywikibot.FilePage,
473
        throttle: Optional[acnutils.Throttle] = None,
474
        ina_throttle: acnutils.Throttle = acnutils.Throttle(10),
475
    ) -> None:
476
        self.page = page
1✔
477
        self._com_license: Optional[str] = None
1✔
478
        self._ina_license: Optional[str] = None
1✔
479
        self._status = ""
1✔
480
        self._ina_author: Optional[str] = None
1✔
481
        self._ina_data: dict = {}
1✔
482
        self._is_old: Optional[bool] = None
1✔
483
        self._no_del: Optional[bool] = None
1✔
484
        self._archive = ""
1✔
485
        self.throttle = throttle
1✔
486
        self.ina_throttle = ina_throttle
1✔
487
        self.reason = ""
1✔
488
        self._photo_id: Optional[iNaturalistID] = None
1✔
489
        self._raw_photo_id: Optional[iNaturalistID] = None
1✔
490
        self._obs_id: Optional[iNaturalistID] = None
1✔
491
        self._locked = False
1✔
492
        self.photo_id_source = ""
1✔
493
        self.log_page = pywikibot.Page(site, config["untagged_log_page"])
1✔
494

495
    @property
1✔
496
    def locked(self) -> bool:
1✔
497
        return self._locked
1✔
498

499
    @locked.setter
1✔
500
    def locked(self, value: bool):
1✔
501
        if self._locked is False:
1!
502
            self._locked = value
1✔
503
        elif value is False:
×
504
            raise TypeError("Can not unlock parameters")
×
505

506
    def lock(self):
1✔
507
        if self.locked is False:
1✔
508
            for hook in lock_hooks:
1!
509
                hook(self)
×
510
            self.locked = True
1✔
511

512
    def _set_locking(self, attr: str, value: Any) -> None:
1✔
513
        if not self.locked:
1✔
514
            setattr(self, attr, value)
1✔
515
        else:
516
            raise TypeError(f"{attr[1:]} has already been read, and can not be changed")
1✔
517

518
    def _get_locking_str(self, attr: str, setter: Optional[Callable] = None) -> str:
1✔
519
        if getattr(self, attr) is None:
1✔
520
            if self.locked:
1!
521
                setattr(self, attr, "")
1✔
522
            elif setter is not None:
×
523
                setter()
×
524
            else:
525
                raise AttributeError(attr[1:])
×
526
        return getattr(self, attr)
1✔
527

528
    def check_can_run(self) -> bool:
1✔
529
        """Determinies if the bot should run on this page and returns a bool."""
530
        page = self.page
1✔
531
        if (
1✔
532
            # Skip files that are still reported as an error
533
            (not self.check_has_template() and self.check_untagged_log())
534
            # Skip if the bot can't edit the page, due to permissions or {{bots}}
535
            or (not page.has_permission("edit"))
536
            or (not page.botMayEdit())
537
            # Skip if there's already a review template with parameters
538
            or (re.search(r"{{[iI][nN]aturalist[rR]eview *?\|.*?}}", page.text))
539
        ):
540
            return False
1✔
541
        else:
542
            return True
1✔
543

544
    def check_has_template(self) -> bool:
1✔
545
        return bool(re.search(r"{{[iI][nN]aturalist[rR]eview", self.page.text))
1✔
546

547
    def check_stop_cats(self) -> None:
1✔
548
        stop_cats = {
1✔
549
            pywikibot.Category(site, title) for title in config["stop_categories"]
550
        }
551
        page_cats = set(self.page.categories())
1✔
552
        page_stop = stop_cats & page_cats
1✔
553
        if page_stop:
1✔
554
            raise StopReview(str(page_stop))
1✔
555

556
    def find_ina_id(self) -> None:
1✔
557
        """Returns an iNaturalistID tuple from wikitext"""
558
        photos = []
1✔
559
        observations = []
1✔
560

561
        for url in self.page.extlinks():
1✔
562
            url_id = parse_ina_url(url)
1✔
563
            if (
1✔
564
                url_id is None
565
                or re.search(r"[A-z]", url_id.id)
566
                or url_id in photos
567
                or url_id in observations
568
            ):
569
                continue  # pragma: no cover
570
            elif url_id.type == "observations":
1✔
571
                observations.append(url_id)
1✔
572
            elif url_id.type == "photos":
1!
573
                photos.append(url_id)
1✔
574

575
        for hook in id_hooks:
1✔
576
            hook_id = hook(self, observations=observations.copy(), photos=photos.copy())
1✔
577
            if hook_id is None or re.search(r"[A-z]", hook_id.id):
1✔
578
                continue  # pragma: no cover
579
            elif hook_id.type == "observations":
1✔
580
                observations.insert(0, hook_id)
1✔
581
            elif hook_id.type == "photos":
1!
582
                photos.insert(0, hook_id)
1✔
583
                observations = []
1✔
584

585
        if photos and observations:
1✔
586
            self.obs_id = observations[0]
1✔
587
            self.raw_photo_id = photos[0]
1✔
588
        elif observations:
1✔
589
            self.obs_id = observations[0]
1✔
590
            self.raw_photo_id = None
1✔
591
        elif photos:
1✔
592
            self.obs_id = None
1✔
593
            self.raw_photo_id = photos[0]
1✔
594
        else:
595
            raise ProcessingError("nourl", "No observation ID could be found")
1✔
596

597
    @property
1✔
598
    def photo_id(self) -> Optional[iNaturalistID]:
1✔
599
        return self._photo_id
1✔
600

601
    @photo_id.setter
1✔
602
    def photo_id(self, value: iNaturalistID):
1✔
603
        self._set_locking("_photo_id", value)
1✔
604

605
    @property
1✔
606
    def raw_photo_id(self) -> Optional[iNaturalistID]:
1✔
607
        return self._raw_photo_id
1✔
608

609
    @raw_photo_id.setter
1✔
610
    def raw_photo_id(self, value: iNaturalistID):
1✔
611
        self._raw_photo_id = value
1✔
612

613
    @property
1✔
614
    def obs_id(self) -> Optional[iNaturalistID]:
1✔
615
        if not self._obs_id and not self.locked:
1✔
616
            if self.raw_photo_id:
1✔
617
                self._obs_id = get_observation_from_photo(self.raw_photo_id)
1✔
618
        return self._obs_id
1✔
619

620
    @obs_id.setter
1✔
621
    def obs_id(self, value: iNaturalistID) -> None:
1✔
622
        self._set_locking("_obs_id", value)
1✔
623

624
    @obs_id.deleter
1✔
625
    def obs_id(self) -> None:
1✔
626
        if not self.locked:
1!
627
            self._obs_id = None
1✔
628
            del self.ina_data
1✔
629
        else:
630
            raise TypeError
×
631

632
    @property
1✔
633
    def ina_data(self) -> dict:
1✔
634
        """Make API request to iNaturalist from an ID and ID type
635

636
        Returns a dict of the API result
637
        """
638
        if not self._ina_data:
1✔
639
            assert self.obs_id
1✔
640
            if self.obs_id.type == "observations":
1✔
641
                url = f"https://api.inaturalist.org/v1/observations/{self.obs_id.id}"
1✔
642
            else:
643
                raise ProcessingError("apierr", "iNaturalist ID is wrong type")
1✔
644

645
            if self.throttle:
1!
646
                self.throttle.throttle()
×
647
            try:
1✔
648
                response = session.get(url, headers={"Accept": "application/json"})
1✔
649
                response.raise_for_status()
1✔
650
                response_json = response.json()
1✔
651
            except (ValueError, requests.exceptions.HTTPError) as err:
1✔
652
                raise ProcessingError("apierr", "iNaturalist API error") from err
1✔
653
            else:
654
                if response_json.get("total_results") != 1:
1✔
655
                    logger.debug(response_json)
1✔
656
                    raise ProcessingError("apierr", f"iNaturalist API error in {url}")
1✔
657
                res = response_json.get("results", [None])[0]
1✔
658
                if not res:
1✔
659
                    raise ProcessingError(
1✔
660
                        "apierr", f"No data recieved from iNaturalist in {url}"
661
                    )
662
                self._ina_data = res
1✔
663
        return self._ina_data
1✔
664

665
    @ina_data.deleter
1✔
666
    def ina_data(self) -> None:
1✔
667
        self._ina_data = {}
1✔
668

669
    def get_ina_license(self) -> None:
1✔
670
        """Find the image license in the iNaturalist API response
671

672
        If a license is found, the Commons template name is returned.
673
        If no license is found, an empty string is returned.
674

675
        The API does not return CC version numbers, but the website has 4.0 links.
676
        CC 4.0 licenses are assumed.
677
        """
678
        assert self.photo_id
1✔
679
        licenses = config["ina_licenses"]
1✔
680
        photos: list = self.ina_data.get("photos", [])
1✔
681
        for photo_data in photos:
1✔
682
            if str(photo_data.get("id")) == self.photo_id.id:
1✔
683
                license_code = photo_data.get("license_code", "null")
1✔
684
                break
1✔
685
        else:
686
            raise ProcessingError("inatlicense", "No iNaturalist license found")
1✔
687

688
        if not license_code:
1!
689
            license_code = "null"
×
690

691
        try:
1✔
692
            self.ina_license = licenses[license_code]
1✔
693
        except KeyError as e:
×
694
            raise ProcessingError("inatlicense", "No iNaturalist license found") from e
×
695
        logger.info(f"iNaturalist License: {self.ina_license}")
1✔
696

697
    @property
1✔
698
    def ina_license(self) -> str:
1✔
699
        return self._get_locking_str("_ina_license", self.get_ina_license)
1✔
700

701
    @ina_license.setter
1✔
702
    def ina_license(self, value: str) -> None:
1✔
703
        self._set_locking("_ina_license", value)
1✔
704

705
    def find_photo_in_obs(self, recurse: bool = True) -> None:
1✔
706
        """Find the matching image in an iNaturalist observation
707

708
        Returns an iNaturalistID named tuple with the photo ID.
709
        """
710
        images = [
1✔
711
            iNaturalistImage(
712
                id=iNaturalistID(type="photos", id=str(photo["id"]), url=photo["url"])
713
            )
714
            for photo in self.ina_data["photos"]
715
        ]
716
        if len(images) < 1:
1✔
717
            raise ProcessingError("notfound", "No photos in observation")
1✔
718
        elif self.raw_photo_id:
1✔
719
            # False sorts before True, otherwise remains in original order
720
            # This will sort the matching photo before other photos in the obs,
721
            # but will still check those other images if no match.
722
            images.sort(key=lambda image: self.raw_photo_id != image)
1✔
723

724
        commons_image = CommonsImage(page=self.page)
1✔
725

726
        for comp_method, comp_func in compare_methods:
1✔
727
            logger.info(f"Comparing photos using {comp_method}")
1✔
728
            for image in images:
1✔
729
                logger.debug(f"Comparing {str(image.id)}")
1✔
730
                try:
1✔
731
                    res = comp_func(com_img=commons_image, ina_img=image)
1✔
732
                except Exception:
×
733
                    res = False
×
734
                if res:
1✔
735
                    logger.info(f"Match found: {str(image.id)}")
1✔
736
                    self.reason = comp_method
1✔
737
                    self.photo_id = image.id
1✔
738
                    return
1✔
739
                elif self.throttle:
1!
740
                    self.throttle.throttle()
×
741
        if self.raw_photo_id and self.raw_photo_id not in images and recurse:
1✔
742
            del self.obs_id
1✔
743
            self.find_photo_in_obs(recurse=False)
1✔
744
        else:
745
            raise ProcessingError("notmatching", "No matching photos found")
1✔
746

747
    def get_ina_author(self):
1✔
748
        self.ina_author = self.ina_data.get("user", {}).get("login", "")
1✔
749
        logger.info(f"Author: {self.ina_author}")
1✔
750

751
    @property
1✔
752
    def ina_author(self) -> str:
1✔
753
        """Find the image author in the iNaturalist API response
754

755
        Returns a string with the username of the iNaturalist contributor
756
        """
757
        return self._get_locking_str("_ina_author", self.get_ina_author)
1✔
758

759
    @ina_author.setter
1✔
760
    def ina_author(self, value: str) -> None:
1✔
761
        self._set_locking("_ina_author", value)
1✔
762

763
    def get_com_license(self):
1✔
764
        """Find the license template currently used on the Commons page
765

766
        Returns the first license template used on the page. If no templates
767
        are found, return an empty string.
768
        """
769

770
        category = pywikibot.Category(site, "Category:Primary license tags (flat list)")
1✔
771
        templates = set(self.page.itertemplates())
1✔
772
        license_tags = set(category.members(namespaces=10))
1✔
773

774
        for template in templates:
1✔
775
            if template in license_tags:
1✔
776
                self._com_license = template.title(with_ns=False)
1✔
777
                break
1✔
778
        else:
779
            logger.info("No Commons license found!")
1✔
780
            self._com_license = ""
1✔
781
        logger.info(f"Commons License: {self.com_license}")
1✔
782

783
    @property
1✔
784
    def com_license(self) -> str:
1✔
785
        return self._get_locking_str("_com_license", self.get_com_license)
1✔
786

787
    @com_license.setter
1✔
788
    def com_license(self, value: str) -> None:
1✔
789
        self._set_locking("_com_license", value)
1✔
790

791
    def compare_licenses(self) -> None:
1✔
792
        free_licenses = set(config["free_licenses"])
1✔
793

794
        if not self.ina_license:
1✔
795
            # iNaturalist license wasn't found, call in the humans
796
            self.status = "error"
1✔
797
        elif self.ina_license not in free_licenses:
1✔
798
            # Source license is non-free, failed license review
799
            self.status = "fail"
1✔
800
        elif self.ina_license == self.com_license:
1✔
801
            # Licenses are the same, license review passes
802
            self.status = "pass"
1✔
803
        else:
804
            # Commons license doesn't match iNaturalist, update to match
805
            self.status = "pass-change"
1✔
806

807
    @property
1✔
808
    def status(self) -> str:
1✔
809
        """Checks the Commons license against the iNaturalist license
810

811
        Returns a string with the status
812
        Statuses:
813
            fail:       iNaturalist license is non-free
814
            error:      Bot could not determine
815
            pass:       Licenses match
816
            pass-change: Commons license changed to free iNaturalist license
817
        """
818
        if not self.locked:
1✔
819
            if not self._status:
1✔
820
                self.compare_licenses()
1✔
821
            for hook in status_hooks:
1✔
822
                hook(self)
1✔
823
        return self._status
1✔
824

825
    @status.setter
1✔
826
    def status(self, value):
1✔
827
        self._set_locking("_status", value)
1✔
828

829
    @status.deleter
1✔
830
    def status(self):
1✔
831
        self.status = ""
1✔
832

833
    def _file_is_old(self) -> bool:
1✔
834
        if not config.get("old_fail", False):
1✔
835
            return False
1✔
836

837
        timestamp = self.page.latest_file_info.timestamp
1✔
838
        if (datetime.datetime.now() - timestamp) > datetime.timedelta(
1✔
839
            days=config["old_fail_age"]
840
        ):
841
            return True
1✔
842
        else:
843
            return False
1✔
844

845
    @property
1✔
846
    def is_old(self) -> bool:
1✔
847
        if self._is_old is None:
1✔
848
            if self.status == "fail":
1✔
849
                self._is_old = self._file_is_old()
1✔
850
            else:
851
                self._is_old = False
1✔
852
        return self._is_old
1✔
853

854
    @is_old.setter
1✔
855
    def is_old(self, value: bool) -> None:
1✔
856
        self._set_locking("_is_old", value)
1✔
857

858
    @property
1✔
859
    def no_del(self) -> bool:
1✔
860
        if self._no_del is None:
1✔
861
            if self.status == "fail":
1✔
862
                page_templates = set(self.page.itertemplates())
1✔
863
                check_templates = {
1✔
864
                    pywikibot.Page(site, "Template:OTRS received"),
865
                    pywikibot.Page(site, "Template:Deletion template tag"),
866
                }
867
                self._no_del = not page_templates.isdisjoint(check_templates)
1✔
868
            else:
869
                self._no_del = False
1✔
870
        return self._no_del
1✔
871

872
    @no_del.setter
1✔
873
    def no_del(self, value) -> None:
1✔
874
        self._set_locking("_no_del", value)
1✔
875

876
    @property
1✔
877
    def archive(self) -> str:
1✔
878
        if not self._archive:
1✔
879
            if config.get("use_wayback") and self.status in ("pass", "pass-change"):
1!
880
                self.get_old_archive()
×
881
                if not self._archive:
×
882
                    self.save_archive()
×
883
            elif self.status == "fail" or (
1✔
884
                self.status != "error" and config.get("wayback_get", True)
885
            ):
886
                self.get_old_archive()
1✔
887
        return self._archive
1✔
888

889
    @archive.setter
1✔
890
    def archive(self, value: str) -> None:
1✔
891
        self._archive = value
1✔
892

893
    def save_archive(self) -> None:
1✔
894
        try:
×
895
            url = waybackpy.Url(str(self.photo_id), user_agent).save()
×
896
            assert url.archive_url is not None
×
897
            self.archive = url.archive_url
×
898
        except Exception as err:
×
899
            logger.warn("Failed to get archive", exc_info=err)
×
900
            self.archive = ""
×
901

902
    def get_old_archive(self) -> None:
1✔
903
        try:
×
904
            url = waybackpy.Url(str(self.photo_id), user_agent).oldest()
×
905
            assert url.archive_url is not None
×
906
            self.archive = url.archive_url
×
907
        except Exception as err:
×
908
            logger.info("Failed to get archive", exc_info=err)
×
909
            self.archive = ""
×
910
        else:
911
            if self.status == "fail":
×
912
                self.status = "fail-archive"
×
913

914
    def uploader_talk(self) -> pywikibot.page.Page:
1✔
915
        return pywikibot.Page(site, f"User talk:{self.page.oldest_file_info.user}")
1✔
916

917
    def update_review(self) -> bool:
1✔
918
        """Updates the wikitext with the review status"""
919
        logger.info(f"Status: {self.status} ({self.reason})")
1✔
920
        self.lock()
1✔
921
        code = mwph.parse(self.page.text)
1✔
922
        template = self.make_template()
1✔
923
        changed = False
1✔
924
        if self.check_has_template():
1✔
925
            # Already tagged for review, replace the existing template
926
            for review_template in code.ifilter_templates(
1✔
927
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
928
            ):
929
                code.replace(review_template, template)
1✔
930
                changed = True
1✔
931
        else:
932
            # Check for normal {{LicenseReview}} template
933
            for review_template in code.ifilter_templates(
1!
934
                matches=lambda t: re.search(r"[Ll]icense ?[Rr]eview", str(t))
935
            ):
936
                code.replace(review_template, template)
×
937
                changed = True
×
938

939
            if not changed:
1!
940
                # Not already tagged, try to put the review template under the license
941
                if self.com_license:
1✔
942
                    aliases = Aliases(self.com_license)
1✔
943
                    for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
944
                        code.insert_after(pt2, "\n" + template)
1✔
945
                        changed = True
1✔
946
                else:
947
                    for node in code.ifilter(
1!
948
                        matches=lambda n: re.search(
949
                            r"(\[\[Category:|\{\{Uncategorized)", str(n)
950
                        )
951
                    ):
952
                        code.insert_before(node, template + "\n\n")
1✔
953
                        changed = True
1✔
954
                        break
1✔
955
                    else:
956
                        code.append("\n\n" + template)
×
957
                        changed = True
×
958

959
        if not changed:
1✔
960
            logger.info("Page not changed")
1✔
961
            return False
1✔
962

963
        if self.status == "pass-change":
1✔
964
            if self.com_license:
1✔
965
                aliases = Aliases(self.com_license)
1✔
966
                for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
967
                    code.replace(pt2, ("{{%s}}" % self.ina_license))
1✔
968
            else:
969
                code.insert_before(template, ("{{%s}}" % self.ina_license))
1✔
970

971
        if self.status == "fail" and not self.no_del:
1✔
972
            code.insert(
1✔
973
                0,
974
                string.Template(
975
                    config["old_fail_tag"] if self.is_old else config["fail_tag"]
976
                ).safe_substitute(
977
                    review_license=self.ina_license,
978
                    source_url=str(self.photo_id) if self.photo_id else "",
979
                ),
980
            )
981

982
        if self.status in ["pass", "pass-change"] and config.get("tag_source"):
1✔
983
            self.add_source_tag(code)
1✔
984

985
        if self.throttle is not None:
1!
986
            self.throttle.throttle()
×
987
        try:
1✔
988
            self.save_page(str(code))
1✔
989
        except Exception as err:
×
990
            logging.exception(err)
×
991
            return False
×
992
        else:
993
            return True
1✔
994

995
    def make_template(self) -> str:
1✔
996
        """Constructs the iNaturalistreview template"""
997
        self.lock()
1✔
998
        if self.status == "stop":
1✔
999
            return ""
1✔
1000
        template = string.Template(config[self.status])
1✔
1001
        text = template.safe_substitute(
1✔
1002
            status=self.status,
1003
            author=self.ina_author,
1004
            source_url=str(self.photo_id) if self.photo_id else "",
1005
            review_date=datetime.date.today().isoformat(),
1006
            reviewer=username,
1007
            review_license=self.ina_license,
1008
            upload_license=self.com_license,
1009
            reason=self.reason,
1010
            archive=self.archive,
1011
        )
1012
        return text
1✔
1013

1014
    def add_source_tag(self, code: mwph.wikicode.Wikicode) -> None:
1✔
1015
        source_tag = ""
1✔
1016
        templates = set(self.page.itertemplates())
1✔
1017
        if not self.obs_id or not config["tag_source"]:
1✔
1018
            return
1✔
1019
        if pywikibot.Page(site, "Template:INaturalist") not in templates:
1!
1020
            source_tag += "\n{{iNaturalist|%s}}" % self.obs_id.id
1✔
1021

1022
        gbif_links = [
1✔
1023
            link
1024
            for link in self.ina_data.get("outlinks", [])
1025
            if link["source"] == "GBIF"
1026
        ]
1027
        if gbif_links and pywikibot.Page(site, "Template:Gbif") not in templates:
1!
1028
            gbif_id = gbif_links[0]["url"].split("/")[-1]
1✔
1029
            source_tag += "\n{{gbif|%s}}" % gbif_id
1✔
1030

1031
        if not source_tag:
1!
1032
            return
×
1033

1034
        try:
1✔
1035
            # Place templates at the bottom of =={{int:filedesc}}==,
1036
            # after any other templates but before categories/other text
1037
            prev = code.get_sections(matches="filedesc")[0].filter_templates(
1✔
1038
                recursive=False
1039
            )[-1]
1040
        except IndexError:
1✔
1041
            # If there is no Summary section, just place after {{iNaturalistreview}}
1042
            prev = code.filter_templates(
1✔
1043
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
1044
            )[0]
1045

1046
        code.insert_after(prev, source_tag)
1✔
1047

1048
    def save_page(self, new_text: str) -> None:
1✔
1049
        """Replaces the wikitext of the specified page with new_text
1050

1051
        If the global simulate variable is true, the wikitext will be printed
1052
        instead of saved to Commons.
1053
        """
1054

1055
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
1056
            status=self.status,
1057
            review_license=self.ina_license,
1058
            version=__version__,
1059
            tag=summary_tag,
1060
        )
1061
        for hook in pre_save_hooks:
1!
1062
            hook(
×
1063
                self,
1064
                new_text=new_text,
1065
                summary=summary,
1066
            )
1067
        if not simulate:
1✔
1068
            acnutils.check_runpage(site, override=run_override)
1✔
1069
            logger.info(f"Saving {self.page.title()}")
1✔
1070
            acnutils.retry(
1✔
1071
                acnutils.save_page,
1072
                3,
1073
                text=new_text,
1074
                page=self.page,
1075
                summary=summary,
1076
                bot=False,
1077
                minor=False,
1078
            )
1079
        else:
1080
            logger.info("Saving disabled")
1✔
1081
            logger.debug(summary)
1✔
1082
            logger.debug(new_text)
1✔
1083

1084
    def fail_warning(self) -> None:
1✔
1085
        user_talk = self.uploader_talk()
1✔
1086
        message = string.Template(
1✔
1087
            config["old_fail_warn"] if self.is_old else config["fail_warn"]
1088
        ).safe_substitute(
1089
            filename=self.page.title(with_ns=True),
1090
            review_license=self.ina_license,
1091
            source_url=str(self.photo_id) if self.photo_id else "",
1092
        )
1093
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
1094
            status="fail",
1095
            review_license=self.ina_license,
1096
            version=__version__,
1097
            tag=summary_tag,
1098
        )
1099
        if not simulate:
1!
1100
            acnutils.check_runpage(site, override=run_override)
1✔
1101
            logger.info(f"Saving {user_talk.title()}")
1✔
1102
            acnutils.retry(
1✔
1103
                acnutils.save_page,
1104
                3,
1105
                text=message,
1106
                page=user_talk,
1107
                summary=summary,
1108
                bot=False,
1109
                minor=False,
1110
                mode="append",
1111
            )
1112
        else:
1113
            logger.info("Saving disabled")
×
1114
            logger.info(summary)
×
1115
            logger.info(message)
×
1116

1117
    def log_untagged_error(self) -> None:
1✔
1118
        if simulate:
×
1119
            return
×
1120
        if self.page.title() not in self.log_page.text:
×
1121
            message = string.Template(config["untagged_log_line"]).safe_substitute(
×
1122
                status=self.status,
1123
                reason=self.reason,
1124
                link=self.page.title(as_link=True, textlink=True),
1125
            )
1126
            summary = string.Template(config["untagged_log_summary"]).safe_substitute(
×
1127
                status=self.status,
1128
                reason=self.reason,
1129
                link=self.page.title(as_link=True, textlink=True),
1130
                version=__version__,
1131
                tag=summary_tag,
1132
            )
1133
            acnutils.check_runpage(site, override=run_override)
×
1134
            acnutils.retry(
×
1135
                acnutils.save_page,
1136
                3,
1137
                text=message,
1138
                page=self.log_page,
1139
                summary=summary,
1140
                bot=False,
1141
                minor=False,
1142
                mode="append",
1143
            )
1144

1145
    def remove_untagged_log(self) -> None:
1✔
1146
        """
1147
        Removes a file from the untagged error log
1148
        """
1149
        new_text, changes = re.subn(
1✔
1150
            r"^.*?{0}.*\n?".format(re.escape(str(self.page.title()))),
1151
            "",
1152
            self.log_page.text,
1153
            flags=re.MULTILINE,
1154
        )
1155
        summary = string.Template(
1✔
1156
            config["untagged_remove_log_summary"]
1157
        ).safe_substitute(
1158
            link=self.page.title(as_link=True, textlink=True),
1159
            version=__version__,
1160
            tag=summary_tag,
1161
        )
1162

1163
        if changes == 0:
1!
1164
            return
1✔
1165
        if simulate:
×
1166
            logger.debug(summary)
×
1167
            logger.debug(new_text)
×
1168
        else:
1169
            acnutils.retry(
×
1170
                acnutils.save_page,
1171
                3,
1172
                text=new_text,
1173
                page=self.log_page,
1174
                summary=summary,
1175
                bot=False,
1176
                minor=False,
1177
            )
1178

1179
    def check_untagged_log(self) -> bool:
1✔
1180
        """
1181
        Returns True if the file is on the untagged log
1182
        """
1183
        for page in self.log_page.linkedPages(namespaces=6):
1✔
1184
            if page == self.page:
1✔
1185
                return True
1✔
1186
        return False
1✔
1187

1188
    def review_file(
1✔
1189
        self, throttle: Optional[acnutils.Throttle] = None
1190
    ) -> Optional[bool]:
1191
        """Performs a license review on the input page
1192

1193
        inpage must be in the file namespace.
1194

1195
        Returns None if the file was skipped
1196
        Returns False if there was an error during review
1197
        Returns True if the file was successfully reviewed (pass or fail)
1198
        """
1199
        logger.info(f"Checking {self.page.title(as_link=True)}")
1✔
1200

1201
        acnutils.check_runpage(site, override=run_override)
1✔
1202
        if not self.check_can_run():
1✔
1203
            return None
1✔
1204

1205
        #####
1206
        try:
1✔
1207
            self.check_stop_cats()
1✔
1208
            # Get iNaturalistID
1209
            self.find_ina_id()
1✔
1210
            logger.info(f"ID found in wikitext: {self.obs_id} {self.raw_photo_id}")
1✔
1211

1212
            try:
1✔
1213
                self.find_photo_in_obs()
1✔
1214
            except ProcessingError as err:
×
1215
                if (
×
1216
                    err.reason_code in ("apierr", "notfound")
1217
                    and self.raw_photo_id
1218
                    and self.obs_id
1219
                ):
1220
                    # Observation ID probably doesn't exist.
1221
                    # If we've got a photo ID, try that.
1222
                    del self.obs_id
×
1223
                    self.find_photo_in_obs()
×
1224
                else:
1225
                    raise
×
1226
            self.compare_licenses()
1✔
1227
            self.get_ina_author()
1✔
1228
            self.archive
1✔
1229

1230
        except ProcessingError as err:
1✔
1231
            logger.info("Processing failed:", exc_info=err)
1✔
1232
            self.status = "error"
1✔
1233
            self.reason = err.reason_code
1✔
1234
        except StopReview as err:
1✔
1235
            logger.info(f"Image already reviewed, contains {err.reason}")
1✔
1236
            self.status = "stop"
1✔
1237
        except (acnutils.RunpageError, KeyboardInterrupt, ConnectionError) as err:
1✔
1238
            raise err
1✔
1239
        except Exception as err:
1✔
1240
            logger.exception(err)
1✔
1241
            self.status = "error"
1✔
1242
            self.reason = repr(err)
1✔
1243

1244
        if self.status == "error" and not self.check_has_template():
1✔
1245
            # Not previously tagged, don't need to throw an error message on it.
1246
            logger.info("Skipping...")
1✔
1247
            self.log_untagged_error()
1✔
1248
            # TODO: report out failures/maintain skip list
1249

1250
            return False
1✔
1251
        reviewed = self.update_review()
1✔
1252
        if self.status == "fail" and reviewed and not self.no_del:
1✔
1253
            self.fail_warning()
1✔
1254

1255
        if reviewed:
1✔
1256
            self.remove_untagged_log()
1✔
1257

1258
        return reviewed
1✔
1259

1260

1261
def main(
1✔
1262
    page: Optional[pywikibot.page.BasePage] = None,
1263
    total: int = 0,
1264
    start: Optional[str] = None,
1265
) -> None:
1266
    """Main loop for program"""
1267
    # Enumerate starts at 0, so to get N items, count to N-1.
1268
    if page:
1✔
1269
        # When given a page, check only that page
1270
        cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1271
        cpage.review_file()
1✔
1272
    else:
1273
        # Otherwise, run automatically
1274
        # If total is 0, run continuously.
1275
        # If total is non-zero, check that many files
1276
        logger.info("Beginning loop")
1✔
1277
        i = 0
1✔
1278
        running = True
1✔
1279
        throttle = acnutils.Throttle(config.get("edit_throttle", 60))
1✔
1280
        while (not total) or (i < total):
1✔
1281
            for page in itertools.chain(
1✔
1282
                files_to_check(start), untagged_files_to_check()
1283
            ):
1284
                do_heartbeat()
1✔
1285
                try:
1✔
1286
                    cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1287
                except ValueError:
×
1288
                    continue
×
1289

1290
                if total and i >= total:
1✔
1291
                    break
1✔
1292
                i += 1
1✔
1293

1294
                try:
1✔
1295
                    check_config()
1✔
1296
                    cpage.review_file()
1✔
1297
                except (acnutils.RunpageError, RestartBot, ConnectionError) as err:
1✔
1298
                    # Blocks and runpage checks always stop
1299
                    logger.exception(err)
1✔
1300
                    raise
1✔
1301
                except Exception as err:
1✔
1302
                    if running:
1✔
1303
                        logger.exception(err)
1✔
1304
                        running = False
1✔
1305
                    else:
1306
                        # If this exception happened after running out
1307
                        # of pages or another exception, stop the bot.
1308
                        logger.exception(err)
1✔
1309
                        raise
1✔
1310
                else:
1311
                    running = True
1✔
1312
                throttle.throttle()
1✔
1313
            else:
1314
                # If the for loop drops out, there are no more pages right now
1315
                if running:
1!
1316
                    running = False
1✔
1317
                    logger.warning("Out of pages to check!")
1✔
1318
                # May need to adjust this number depending on load
1319
                else:
1320
                    time.sleep(60)
×
1321

1322

1323
config, conf_ts = get_config()
1✔
1324
init_compare_methods()
1✔
1325
if __name__ == "__main__":
1!
1326
    parser = argparse.ArgumentParser(
×
1327
        description="Review files from iNaturalist on Commons",
1328
        prog="iNaturalistReviewer",
1329
    )
1330
    run_method = parser.add_mutually_exclusive_group(required=True)
×
1331
    run_method.add_argument(
×
1332
        "--auto", action="store_true", help="run the bot automatically"
1333
    )
1334
    run_method.add_argument(
×
1335
        "--file", action="store", help="run the bot only on the specified file"
1336
    )
1337
    parser.add_argument(
×
1338
        "--total",
1339
        action="store",
1340
        type=int,
1341
        help="review no more than this number of files in automatic mode",
1342
        default=0,
1343
    )
1344
    parser.add_argument(
×
1345
        "--ignore-runpage",
1346
        action="store_true",
1347
        dest="ignore_runpage",
1348
        help="skip the runpage check for testing",
1349
    )
1350
    parser.add_argument(
×
1351
        "--start",
1352
        action="store",
1353
        help="sortkey to start iterating at",
1354
        default=None,
1355
    )
1356
    sim = parser.add_mutually_exclusive_group()
×
1357
    sim.add_argument(
×
1358
        "--simulate",
1359
        action="store_true",
1360
        help="print the output wikitext instead of saving to Commons",
1361
    )
1362
    sim.add_argument(
×
1363
        "--no-simulate",
1364
        action="store_true",
1365
        dest="no_simulate",
1366
        help="forces saving when disabled by --ignore-runpage",
1367
    )
1368
    parser.add_argument(
×
1369
        "--version", action="version", version="%(prog)s " + __version__
1370
    )
1371
    args = parser.parse_args()
×
1372

1373
    run_override = args.ignore_runpage
×
1374
    if run_override:
×
1375
        if args.no_simulate:
×
1376
            simulate = False
×
1377
        else:
1378
            simulate = True
×
1379
    else:
1380
        simulate = args.simulate
×
1381

1382
    site.login()
×
1383
    if args.auto:
×
1384
        main(total=args.total, start=args.start)
×
1385
    elif args.file and "File" in args.file:
×
1386
        main(page=pywikibot.Page(site, args.file))
×
1387
else:
1388
    run_override = False
1✔
1389
    simulate = False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc