• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

AntiCompositeNumber / iNaturalistReviewer / 14254510804

03 Apr 2025 11:12PM UTC coverage: 63.63%. Remained the same
14254510804

Pull #367

github

web-flow
Merge e67690d9f into 1533e5144
Pull Request #367: Bump the development-dependencies group across 1 directory with 9 updates

196 of 352 branches covered (55.68%)

Branch coverage included in aggregate %.

670 of 1009 relevant lines covered (66.4%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.8
/src/inrbot.py
1
#!/usr/bin/env python3
2
# coding: utf-8
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
# Copyright 2023 AntiCompositeNumber
5

6
import argparse
1✔
7
import datetime
1✔
8
import hashlib
1✔
9
import itertools
1✔
10
import json
1✔
11
import logging
1✔
12
import logging.config
1✔
13
import os
1✔
14
import re
1✔
15
import string
1✔
16
import time
1✔
17
import urllib.parse
1✔
18
from hmac import compare_digest
1✔
19
from io import BytesIO
1✔
20
from pathlib import Path
1✔
21

22
import imagehash  # type: ignore
1✔
23
import mwparserfromhell as mwph  # type: ignore
1✔
24
import pywikibot  # type: ignore
1✔
25
import pywikibot.pagegenerators as pagegenerators  # type: ignore
1✔
26
import requests
1✔
27
import PIL.Image  # type: ignore
1✔
28
import waybackpy
1✔
29

30
from typing import NamedTuple, Optional, Set, Tuple, Dict, Union, cast, Callable, List
1✔
31
from typing import Any, Iterator
1✔
32

33
import acnutils
1✔
34

35
__version__ = "2.7.0"
1✔
36

37
logger = acnutils.getInitLogger("inrbot", level="VERBOSE", filename="inrbot.log")
1✔
38

39
site = pywikibot.Site("commons", "commons")
1✔
40
username = site.username()
1✔
41
summary_tag = f"(inrbot {__version__})"
1✔
42
user_agent = (
1✔
43
    f"Bot iNaturalistReviewer/{__version__} "
44
    "on Wikimedia Toolforge "
45
    f"(Contact: https://commons.wikimedia.org/wiki/User:{username}; "
46
    "https://www.inaturalist.org/people/anticompositenumber "
47
    "tools.inaturalistreviewer@tools.wmflabs.org) "
48
    f"Python requests/{requests.__version__}"
49
)
50

51
session = requests.Session()
1✔
52
session.headers.update({"user-agent": user_agent})
1✔
53
recent_bytes = {}
1✔
54
conf_ts = None
1✔
55

56
compare_methods: List[Tuple[str, Callable]] = []
1✔
57
pre_save_hooks: List[Callable] = []
1✔
58
id_hooks: List[Callable] = []
1✔
59
status_hooks: List[Callable] = []
1✔
60
lock_hooks: List[Callable] = []
1✔
61

62

63
class iNaturalistID(NamedTuple):
1✔
64
    id: str
1✔
65
    type: str
1✔
66
    url: str = ""
1✔
67

68
    def __str__(self):
1✔
69
        return f"https://www.inaturalist.org/{self.type}/{self.id}"
1✔
70

71
    def __eq__(self, other):
1✔
72
        if isinstance(other, iNaturalistID):
1!
73
            return self.id == other.id and self.type == other.type
1✔
74
        elif isinstance(other, iNaturalistImage):
×
75
            return self.id == other.id.id and self.type == other.id.type
×
76
        else:
77
            return NotImplemented
×
78

79

80
class RestartBot(RuntimeError):
1✔
81
    pass
1✔
82

83

84
class ProcessingError(Exception):
1✔
85
    def __init__(self, reason_code: str, description: str = ""):
1✔
86
        self.reason_code = reason_code
1✔
87
        self.description = description
1✔
88

89

90
class StopReview(Exception):
1✔
91
    def __init__(self, reason: str):
1✔
92
        self.reason = reason
1✔
93

94

95
class ExponentialRateLimit:
1✔
96
    """Provide an exponential backoff based on calls to failure()
97

98
    Unlike acnutils.Throttle, this throttle is not self-enforcing.
99
    Instead, call should_run() to determine if a request to a flaky service
100
    should be made.
101
    """
102

103
    def __init__(
1✔
104
        self, interval_seconds: int, base: int, max_fails: int = 0, log_name: str = ""
105
    ) -> None:
106
        """
107
        :param interval_seconds: Delay interval, in seconds.
108
        :param base: Base of exponential delay
109
        :param max_fails: Number of failures beyond which delay should not increase.
110
            Set to 0 for no maximum.
111
        """
112
        self.max_fails = max_fails
1✔
113
        self.interval = interval_seconds
1✔
114
        self.base = base
1✔
115
        self.fails = 0
1✔
116
        self.max_fails = max_fails
1✔
117
        self.last_request = 0.0
1✔
118
        self.logger = logger.getChild(log_name or "ExponentialRateLimit")
1✔
119

120
    def success(self) -> None:
1✔
121
        if self.fails >= 0:
1!
122
            self.fails = self.fails - 1
1✔
123
        self.last_request = time.monotonic()
1✔
124

125
    def failure(self) -> None:
1✔
126
        if self.max_fails == 0 or self.fails < self.max_fails:
1!
127
            self.fails = self.fails + 1
1✔
128
        else:
129
            self.logger.error(
×
130
                f"Maximum failures exceeded ({self.fails=} >= {self.max_fails=})"
131
            )
132

133
        self.last_request = time.monotonic()
1✔
134

135
    def backoff_seconds(self) -> int:
1✔
136
        return self.interval * (self.base**self.fails)
1✔
137

138
    def should_run(self) -> bool:
1✔
139
        if self.fails == 0:
1✔
140
            return True
1✔
141

142
        backoff = self.backoff_seconds()
1✔
143
        self.logger.debug(f"{self.fails=}, {backoff=}")
1✔
144
        return self.last_request + backoff <= time.monotonic()
1✔
145

146

147
petscan_backoff = ExponentialRateLimit(
1✔
148
    interval_seconds=60, base=6, max_fails=4, log_name="petscan_backoff"
149
)
150

151

152
def get_config() -> Tuple[dict, datetime.datetime]:
1✔
153
    """Load on-wiki configuration"""
154
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
155
    conf_json = json.loads(page.text)
1✔
156
    logger.info(f"Loaded config from {page.title(as_link=True)}")
1✔
157
    logger.debug(json.dumps(conf_json, indent=2))
1✔
158
    ts = datetime.datetime.utcnow()
1✔
159
    return conf_json, ts
1✔
160

161

162
def check_config() -> None:
1✔
163
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
164
    if conf_ts and page.latest_revision.timestamp > conf_ts:
1!
165
        raise RestartBot("Configuration has been updated, bot will restart")
×
166

167

168
def init_compare_methods() -> None:
1✔
169
    global compare_methods
170
    compare_methods = []
1✔
171
    if "sha1" in config["compare_methods"]:
1✔
172
        compare_methods.append(("sha1", compare_sha1))
1✔
173
    if "phash" in config["compare_methods"]:
1✔
174
        compare_methods.append(("phash", compare_phash))
1✔
175

176

177
def files_to_check(start: Optional[str] = None) -> Iterator[pywikibot.page.BasePage]:
1✔
178
    """Iterate list of files needing review from Commons"""
179
    category = pywikibot.Category(site, "Category:INaturalist review needed")
×
180
    do_heartbeat()
×
181
    for page in pagegenerators.CategorizedPageGenerator(
×
182
        category, namespaces=6, start=start
183
    ):
184
        yield page
×
185

186

187
def untagged_files_to_check() -> Iterator[pywikibot.page.BasePage]:
1✔
188
    if not (config.get("find_untagged") and petscan_backoff.should_run()):
×
189
        pages = []
×
190
    else:
191
        try:
×
192
            res = session.get(
×
193
                config["petscan_url"], params=config["untagged_petscan_query"]
194
            )
195
            res.raise_for_status()
×
196

197
            data = res.json()
×
198
            assert data["n"] == "result"
×
199
            pages = data["*"][0]["a"]["*"]
×
200
            petscan_backoff.success()
×
201
        except Exception as err:
×
202
            logger.warning(f"Failed to get data from {res.url}", exc_info=err)
×
203
            pages = []
×
204
            petscan_backoff.failure()
×
205
        logger.info(f"Found {len(pages)} untagged files to check")
×
206

207
    # Whether we get data from PetScan is unrelated to the health of inrbot
208
    do_heartbeat()
×
209
    for page_data in pages:
×
210
        yield pywikibot.FilePage(site, title=page_data["title"])
×
211

212

213
def gbif_to_ina_url(url: urllib.parse.ParseResult) -> str:
1✔
214
    path = url.path.split(sep="/")
1✔
215
    if path[1] != "occurrence":
1!
216
        return ""
×
217
    api_url = f"https://api.gbif.org/v1/occurrence/{path[2]}"
1✔
218
    res = session.get(api_url)
1✔
219
    res.raise_for_status()
1✔
220
    return res.json().get("references", "")
1✔
221

222

223
def parse_ina_url(raw_url: str) -> Optional[iNaturalistID]:
1✔
224
    """Parses an iNaturalist URL into an iNaturalistID named tuple"""
225
    url = urllib.parse.urlparse(raw_url)
1✔
226
    path = url.path.split(sep="/")
1✔
227
    netloc = url.netloc.lower()
1✔
228
    if len(path) == 3 and any(
1✔
229
        netloc.endswith(domain) for domain in config["inaturalist_domains"]
230
    ):
231
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
232
    elif len(path) == 4 and netloc in (
1✔
233
        "inaturalist-open-data.s3.amazonaws.com",
234
        "static.inaturalist.org",
235
    ):
236
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
237
    elif len(path) == 3 and netloc == "www.gbif.org":
1✔
238
        return parse_ina_url(gbif_to_ina_url(url))
1✔
239
    else:
240
        return None
1✔
241

242

243
class Image:
1✔
244
    def __init__(
1✔
245
        self,
246
        raw: Optional[bytes] = None,
247
        image: Optional[PIL.Image.Image] = None,
248
        sha1: str = "",
249
        phash: Optional[imagehash.ImageHash] = None,
250
    ):
251
        self._raw = raw
1✔
252
        self._image = image
1✔
253
        self._sha1 = sha1
1✔
254
        self._phash = phash
1✔
255

256
    @property
1✔
257
    def phash(self) -> imagehash.ImageHash:
1✔
258
        if not self._phash:
1!
259
            self._phash = imagehash.phash(self.image)
1✔
260
        return self._phash
1✔
261

262
    @property
1✔
263
    def image(self):
1✔
264
        raise NotImplementedError
×
265

266
    def __repr__(self) -> str:
1✔
267
        paras = ", ".join(
×
268
            f"{key}={repr(value)}" for key, value in self.__dict__.items()
269
        )
270
        return f"{type(self).__name__}({paras})"
×
271

272
    def __eq__(self, other):
1✔
273
        if isinstance(other, Image):
1!
274
            return self.id == other.id
×
275
        elif isinstance(other, iNaturalistID):
1!
276
            return self.id == other
1✔
277
        else:
278
            return NotImplemented
×
279

280

281
class iNaturalistImage(Image):
1✔
282
    _cache: Dict[iNaturalistID, str] = {}
1✔
283

284
    def __init__(self, id: iNaturalistID, **kwargs):
1✔
285
        self.id = id
1✔
286
        super().__init__(**kwargs)
1✔
287

288
    @property
1✔
289
    def raw(self) -> bytes:
1✔
290
        if not self._raw:
1!
291
            self._raw = acnutils.retry(get_ina_image, 3, photo=self.id)
1✔
292
        return cast(bytes, self._raw)
1✔
293

294
    @property
1✔
295
    def image(self) -> PIL.Image.Image:
1✔
296
        if not self._image:
1!
297
            self._image = PIL.Image.open(BytesIO(self.raw))
1✔
298
        return self._image
1✔
299

300
    @property
1✔
301
    def sha1(self) -> str:
1✔
302
        if not self._sha1:
1✔
303
            if self.id in self._cache:
1✔
304
                self._sha1 = self._cache[self.id]
1✔
305
            else:
306
                sha1sum = hashlib.sha1()
1✔
307
                sha1sum.update(self.raw)
1✔
308
                self._sha1 = sha1sum.hexdigest()
1✔
309
                self._cache[self.id] = self._sha1
1✔
310
        return self._sha1
1✔
311

312

313
class CommonsImage(Image):
1✔
314
    def __init__(self, page: pywikibot.FilePage, **kwargs):
1✔
315
        self.page = page
1✔
316
        if self.page:
1✔
317
            self.page.get(force=True)
1✔
318
        super().__init__(**kwargs)
1✔
319

320
    @property
1✔
321
    def raw(self):
1✔
322
        return NotImplemented
×
323

324
    @property
1✔
325
    def image(self) -> PIL.Image.Image:
1✔
326
        """Download orignal Commons file and open as a PIL image"""
327
        if not self._image:
1!
328
            url = self.page.get_file_url()
1✔
329
            response = session.get(url)
1✔
330
            response.raise_for_status()
1✔
331
            self._image = PIL.Image.open(BytesIO(response.content))
1✔
332
        return self._image
1✔
333

334
    @property
1✔
335
    def sha1(self) -> str:
1✔
336
        if not self._sha1:
1✔
337
            self._sha1 = self.page.latest_file_info.sha1
1✔
338
        return self._sha1
1✔
339

340

341
def compare_sha1(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
342
    logger.debug(f"Commons sha1sum:     {com_img.sha1}")
1✔
343
    logger.debug(f"iNaturalist sha1sum: {ina_img.sha1}")
1✔
344
    return compare_digest(com_img.sha1, ina_img.sha1)
1✔
345

346

347
def compare_phash(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
348
    diff = com_img.phash - ina_img.phash
1✔
349
    logger.debug(f"PHash Hamming distance: {diff}")
1✔
350
    return diff <= config.get("max_phash_dist", 4)
1✔
351

352

353
def get_ina_image(photo: iNaturalistID, final: bool = False) -> bytes:
1✔
354
    """Download original photo from iNaturalist"""
355
    if photo.url:
1!
356
        extension = photo.url.partition("?")[0].rpartition(".")[2]
1✔
357
        domain = photo.url.partition("//")[2].partition("/")[0]
1✔
358
    else:
359
        extension = "jpeg"
×
360
        domain = "inaturalist-open-data.s3.amazonaws.com"
×
361
    # TODO: Replace this hardcoded URL
362
    url = f"https://{domain}/photos/{photo.id}/original.{extension}"
1✔
363
    response = session.get(url)
1✔
364
    if response.status_code == 403 and not final:
1!
365
        return get_ina_image(photo._replace(url=url.replace("jpeg", "jpg")), final=True)
×
366
    response.raise_for_status()
1✔
367
    return response.content
1✔
368

369

370
def bytes_throttle(length: int) -> None:
1✔
371
    hour_limit = 4.5e9
×
372
    day_limit = 23.5e9
×
373
    logger.debug(f"Content length: {length}")
×
374
    now = datetime.datetime.now()
×
375
    recent_bytes[datetime.datetime.now()] = length
×
376

377
    last_hour = 0
×
378
    last_day = 0
×
379
    for date, val in recent_bytes.copy().items():
×
380
        if now - date <= datetime.timedelta(hours=24):
×
381
            last_day += val
×
382
            if now - date <= datetime.timedelta(hours=1):
×
383
                last_hour += val
×
384
        else:
385
            del recent_bytes[date]
×
386

387
    logger.debug(f"Hour total: {last_hour}, day total: {last_day}")
×
388
    if last_day >= day_limit:
×
389
        logger.error(
×
390
            f"{last_day} bytes transferred in last 24h, approaching iNaturalist limits!"
391
        )
392
        sleep_time = 3600 * 12  # 12 hours
×
393
    elif last_hour >= hour_limit:
×
394
        logger.error(
×
395
            f"{last_hour} bytes transferred in last hour, "
396
            "approaching iNaturalist limits!"
397
        )
398
        sleep_time = 60 * 30  # 30 minutes
×
399
    else:
400
        return None
×
401
    logger.info(f"Sleeping for {sleep_time} seconds")
×
402
    time.sleep(sleep_time)
×
403
    return None
×
404

405

406
def do_heartbeat() -> None:
1✔
407
    """Update the timestamp on a file (if provided)
408

409
    Works with inrbot-healthcheck.sh when the HEARTBEAT_FILE environment variable is set
410
    """
411
    if os.environ.get("HEARTBEAT_FILE"):
1!
412
        Path(os.environ["HEARTBEAT_FILE"]).touch()
×
413

414

415
class Aliases:
1✔
416
    alias_cache: Dict[str, Dict[str, Union[float, Set[str]]]] = {}
1✔
417

418
    def __init__(self, title: str) -> None:
1✔
419
        self.title: str = title
1✔
420
        self._aliases: Optional[Set[str]] = None
1✔
421

422
    def get_aliases(self) -> None:
1✔
423
        canon_page = pywikibot.Page(site, f"Template:{self.title}")
1✔
424
        aliases = {
1✔
425
            page.title(with_ns=False).lower()
426
            for page in canon_page.backlinks(filter_redirects=True, namespaces=10)
427
        }
428
        aliases.add(canon_page.title(with_ns=False).lower())
1✔
429
        aliases.update(
1✔
430
            page.title(with_ns=False).lower().partition("/")[0]
431
            for page in canon_page.embeddedin(namespaces=10)
432
        )
433
        self._aliases = aliases
1✔
434

435
    @property
1✔
436
    def aliases(self):
1✔
437
        if self._aliases is None:
1✔
438
            cached = self.alias_cache.get(self.title)
1✔
439
            if cached is None or time.monotonic() - cached["last_update"] > 3600:
1✔
440
                self.get_aliases()
1✔
441
                self.alias_cache[self.title] = {
1✔
442
                    "last_update": time.monotonic(),
443
                    "aliases": self._aliases,
444
                }
445
            else:
446
                self._aliases = cached["aliases"]
1✔
447
        return self._aliases
1✔
448

449
    def is_license(self, template: mwph.nodes.Template) -> bool:
1✔
450
        if template.name.lower() in self.aliases:
1✔
451
            return True
1✔
452
        elif template.name.lower() == "self":
1!
453
            return True
×
454
        return False
1✔
455

456

457
def get_observation_from_photo(photo_id: iNaturalistID) -> iNaturalistID:
1✔
458
    assert photo_id.type == "photos"
1✔
459
    try:
1✔
460
        res = session.get(str(photo_id))
1✔
461
        res.raise_for_status()
1✔
462
    except Exception:
×
463
        raise ProcessingError("nourl", "No observation ID could be found")
×
464
    # Yes, I know I'm parsing HTML with a regex.
465
    match = re.search(r"/observations/(\d*)\"", res.text)
1✔
466
    if not match:
1!
467
        raise ProcessingError("nourl", "No observation ID could be found")
×
468
    else:
469
        return iNaturalistID(type="observations", id=match.group(1))
1✔
470

471

472
class CommonsPage:
1✔
473
    def __init__(
1✔
474
        self,
475
        page: pywikibot.FilePage,
476
        throttle: Optional[acnutils.Throttle] = None,
477
        ina_throttle: acnutils.Throttle = acnutils.Throttle(10),
478
    ) -> None:
479
        self.page = page
1✔
480
        self._com_license: Optional[str] = None
1✔
481
        self._ina_license: Optional[str] = None
1✔
482
        self._status = ""
1✔
483
        self._ina_author: Optional[str] = None
1✔
484
        self._ina_data: dict = {}
1✔
485
        self._is_old: Optional[bool] = None
1✔
486
        self._no_del: Optional[bool] = None
1✔
487
        self._archive = ""
1✔
488
        self.throttle = throttle
1✔
489
        self.ina_throttle = ina_throttle
1✔
490
        self.reason = ""
1✔
491
        self._photo_id: Optional[iNaturalistID] = None
1✔
492
        self._raw_photo_id: Optional[iNaturalistID] = None
1✔
493
        self._obs_id: Optional[iNaturalistID] = None
1✔
494
        self._locked = False
1✔
495
        self.photo_id_source = ""
1✔
496
        self.log_page = pywikibot.Page(site, config["untagged_log_page"])
1✔
497

498
    @property
1✔
499
    def locked(self) -> bool:
1✔
500
        return self._locked
1✔
501

502
    @locked.setter
1✔
503
    def locked(self, value: bool):
1✔
504
        if self._locked is False:
1!
505
            self._locked = value
1✔
506
        elif value is False:
×
507
            raise TypeError("Can not unlock parameters")
×
508

509
    def lock(self):
1✔
510
        if self.locked is False:
1✔
511
            for hook in lock_hooks:
1!
512
                hook(self)
×
513
            self.locked = True
1✔
514

515
    def _set_locking(self, attr: str, value: Any) -> None:
1✔
516
        if not self.locked:
1✔
517
            setattr(self, attr, value)
1✔
518
        else:
519
            raise TypeError(f"{attr[1:]} has already been read, and can not be changed")
1✔
520

521
    def _get_locking_str(self, attr: str, setter: Optional[Callable] = None) -> str:
1✔
522
        if getattr(self, attr) is None:
1✔
523
            if self.locked:
1!
524
                setattr(self, attr, "")
1✔
525
            elif setter is not None:
×
526
                setter()
×
527
            else:
528
                raise AttributeError(attr[1:])
×
529
        return getattr(self, attr)
1✔
530

531
    def check_can_run(self) -> bool:
1✔
532
        """Determinies if the bot should run on this page and returns a bool."""
533
        page = self.page
1✔
534
        if (
1✔
535
            # Skip files that are still reported as an error
536
            (not self.check_has_template() and self.check_untagged_log())
537
            # Skip if the bot can't edit the page, due to permissions or {{bots}}
538
            or (not page.has_permission("edit"))
539
            or (not page.botMayEdit())
540
            # Skip if there's already a review template with parameters
541
            or (re.search(r"{{[iI][nN]aturalist[rR]eview *?\|.*?}}", page.text))
542
        ):
543
            return False
1✔
544
        else:
545
            return True
1✔
546

547
    def check_has_template(self) -> bool:
1✔
548
        return bool(re.search(r"{{[iI][nN]aturalist[rR]eview", self.page.text))
1✔
549

550
    def check_stop_cats(self) -> None:
1✔
551
        stop_cats = {
1✔
552
            pywikibot.Category(site, title) for title in config["stop_categories"]
553
        }
554
        page_cats = set(self.page.categories())
1✔
555
        page_stop = stop_cats & page_cats
1✔
556
        if page_stop:
1✔
557
            raise StopReview(str(page_stop))
1✔
558

559
    def find_ina_id(self) -> None:
1✔
560
        """Returns an iNaturalistID tuple from wikitext"""
561
        photos = []
1✔
562
        observations = []
1✔
563

564
        for url in self.page.extlinks():
1✔
565
            url_id = parse_ina_url(url)
1✔
566
            if (
1✔
567
                url_id is None
568
                or re.search(r"[A-z]", url_id.id)
569
                or url_id in photos
570
                or url_id in observations
571
            ):
572
                continue  # pragma: no cover
573
            elif url_id.type == "observations":
1✔
574
                observations.append(url_id)
1✔
575
            elif url_id.type == "photos":
1!
576
                photos.append(url_id)
1✔
577

578
        for hook in id_hooks:
1✔
579
            hook_id = hook(self, observations=observations.copy(), photos=photos.copy())
1✔
580
            if hook_id is None or re.search(r"[A-z]", hook_id.id):
1✔
581
                continue  # pragma: no cover
582
            elif hook_id.type == "observations":
1✔
583
                observations.insert(0, hook_id)
1✔
584
            elif hook_id.type == "photos":
1!
585
                photos.insert(0, hook_id)
1✔
586
                observations = []
1✔
587

588
        if photos and observations:
1✔
589
            self.obs_id = observations[0]
1✔
590
            self.raw_photo_id = photos[0]
1✔
591
        elif observations:
1✔
592
            self.obs_id = observations[0]
1✔
593
            self.raw_photo_id = None
1✔
594
        elif photos:
1✔
595
            self.obs_id = None
1✔
596
            self.raw_photo_id = photos[0]
1✔
597
        else:
598
            raise ProcessingError("nourl", "No observation ID could be found")
1✔
599

600
    @property
1✔
601
    def photo_id(self) -> Optional[iNaturalistID]:
1✔
602
        return self._photo_id
1✔
603

604
    @photo_id.setter
1✔
605
    def photo_id(self, value: iNaturalistID):
1✔
606
        self._set_locking("_photo_id", value)
1✔
607

608
    @property
1✔
609
    def raw_photo_id(self) -> Optional[iNaturalistID]:
1✔
610
        return self._raw_photo_id
1✔
611

612
    @raw_photo_id.setter
1✔
613
    def raw_photo_id(self, value: iNaturalistID):
1✔
614
        self._raw_photo_id = value
1✔
615

616
    @property
1✔
617
    def obs_id(self) -> Optional[iNaturalistID]:
1✔
618
        if not self._obs_id and not self.locked:
1✔
619
            if self.raw_photo_id:
1✔
620
                self._obs_id = get_observation_from_photo(self.raw_photo_id)
1✔
621
        return self._obs_id
1✔
622

623
    @obs_id.setter
1✔
624
    def obs_id(self, value: iNaturalistID) -> None:
1✔
625
        self._set_locking("_obs_id", value)
1✔
626

627
    @obs_id.deleter
1✔
628
    def obs_id(self) -> None:
1✔
629
        if not self.locked:
1!
630
            self._obs_id = None
1✔
631
            del self.ina_data
1✔
632
        else:
633
            raise TypeError
×
634

635
    @property
1✔
636
    def ina_data(self) -> dict:
1✔
637
        """Make API request to iNaturalist from an ID and ID type
638

639
        Returns a dict of the API result
640
        """
641
        if not self._ina_data:
1✔
642
            assert self.obs_id
1✔
643
            if self.obs_id.type == "observations":
1✔
644
                url = f"https://api.inaturalist.org/v1/observations/{self.obs_id.id}"
1✔
645
            else:
646
                raise ProcessingError("apierr", "iNaturalist ID is wrong type")
1✔
647

648
            if self.throttle:
1!
649
                self.throttle.throttle()
×
650
            try:
1✔
651
                response = session.get(url, headers={"Accept": "application/json"})
1✔
652
                response.raise_for_status()
1✔
653
                response_json = response.json()
1✔
654
            except (ValueError, requests.exceptions.HTTPError) as err:
1✔
655
                raise ProcessingError("apierr", "iNaturalist API error") from err
1✔
656
            else:
657
                if response_json.get("total_results") != 1:
1✔
658
                    logger.debug(response_json)
1✔
659
                    raise ProcessingError("apierr", f"iNaturalist API error in {url}")
1✔
660
                res = response_json.get("results", [None])[0]
1✔
661
                if not res:
1✔
662
                    raise ProcessingError(
1✔
663
                        "apierr", f"No data recieved from iNaturalist in {url}"
664
                    )
665
                self._ina_data = res
1✔
666
        return self._ina_data
1✔
667

668
    @ina_data.deleter
1✔
669
    def ina_data(self) -> None:
1✔
670
        self._ina_data = {}
1✔
671

672
    def get_ina_license(self) -> None:
1✔
673
        """Find the image license in the iNaturalist API response
674

675
        If a license is found, the Commons template name is returned.
676
        If no license is found, an empty string is returned.
677

678
        The API does not return CC version numbers, but the website has 4.0 links.
679
        CC 4.0 licenses are assumed.
680
        """
681
        assert self.photo_id
1✔
682
        licenses = config["ina_licenses"]
1✔
683
        photos: list = self.ina_data.get("photos", [])
1✔
684
        for photo_data in photos:
1✔
685
            if str(photo_data.get("id")) == self.photo_id.id:
1✔
686
                license_code = photo_data.get("license_code", "null")
1✔
687
                break
1✔
688
        else:
689
            raise ProcessingError("inatlicense", "No iNaturalist license found")
1✔
690

691
        if not license_code:
1!
692
            license_code = "null"
×
693

694
        try:
1✔
695
            self.ina_license = licenses[license_code]
1✔
696
        except KeyError as e:
×
697
            raise ProcessingError("inatlicense", "No iNaturalist license found") from e
×
698
        logger.info(f"iNaturalist License: {self.ina_license}")
1✔
699

700
    @property
1✔
701
    def ina_license(self) -> str:
1✔
702
        return self._get_locking_str("_ina_license", self.get_ina_license)
1✔
703

704
    @ina_license.setter
1✔
705
    def ina_license(self, value: str) -> None:
1✔
706
        self._set_locking("_ina_license", value)
1✔
707

708
    def find_photo_in_obs(self, recurse: bool = True) -> None:
1✔
709
        """Find the matching image in an iNaturalist observation
710

711
        Returns an iNaturalistID named tuple with the photo ID.
712
        """
713
        images = [
1✔
714
            iNaturalistImage(
715
                id=iNaturalistID(type="photos", id=str(photo["id"]), url=photo["url"])
716
            )
717
            for photo in self.ina_data["photos"]
718
        ]
719
        if len(images) < 1:
1✔
720
            raise ProcessingError("notfound", "No photos in observation")
1✔
721
        elif self.raw_photo_id:
1✔
722
            # False sorts before True, otherwise remains in original order
723
            # This will sort the matching photo before other photos in the obs,
724
            # but will still check those other images if no match.
725
            images.sort(key=lambda image: self.raw_photo_id != image)
1✔
726

727
        commons_image = CommonsImage(page=self.page)
1✔
728

729
        for comp_method, comp_func in compare_methods:
1✔
730
            logger.info(f"Comparing photos using {comp_method}")
1✔
731
            for image in images:
1✔
732
                logger.debug(f"Comparing {str(image.id)}")
1✔
733
                try:
1✔
734
                    res = comp_func(com_img=commons_image, ina_img=image)
1✔
735
                except Exception:
×
736
                    res = False
×
737
                if res:
1✔
738
                    logger.info(f"Match found: {str(image.id)}")
1✔
739
                    self.reason = comp_method
1✔
740
                    self.photo_id = image.id
1✔
741
                    return
1✔
742
                elif self.throttle:
1!
743
                    self.throttle.throttle()
×
744
        if self.raw_photo_id and self.raw_photo_id not in images and recurse:
1✔
745
            del self.obs_id
1✔
746
            self.find_photo_in_obs(recurse=False)
1✔
747
        else:
748
            raise ProcessingError("notmatching", "No matching photos found")
1✔
749

750
    def get_ina_author(self):
1✔
751
        self.ina_author = self.ina_data.get("user", {}).get("login", "")
1✔
752
        logger.info(f"Author: {self.ina_author}")
1✔
753

754
    @property
1✔
755
    def ina_author(self) -> str:
1✔
756
        """Find the image author in the iNaturalist API response
757

758
        Returns a string with the username of the iNaturalist contributor
759
        """
760
        return self._get_locking_str("_ina_author", self.get_ina_author)
1✔
761

762
    @ina_author.setter
1✔
763
    def ina_author(self, value: str) -> None:
1✔
764
        self._set_locking("_ina_author", value)
1✔
765

766
    def get_com_license(self):
1✔
767
        """Find the license template currently used on the Commons page
768

769
        Returns the first license template used on the page. If no templates
770
        are found, return an empty string.
771
        """
772

773
        category = pywikibot.Category(site, "Category:Primary license tags (flat list)")
1✔
774
        templates = set(self.page.itertemplates())
1✔
775
        license_tags = set(category.members(namespaces=10))
1✔
776

777
        for template in templates:
1✔
778
            if template in license_tags:
1✔
779
                self._com_license = template.title(with_ns=False)
1✔
780
                break
1✔
781
        else:
782
            logger.info("No Commons license found!")
1✔
783
            self._com_license = ""
1✔
784
        logger.info(f"Commons License: {self.com_license}")
1✔
785

786
    @property
1✔
787
    def com_license(self) -> str:
1✔
788
        return self._get_locking_str("_com_license", self.get_com_license)
1✔
789

790
    @com_license.setter
1✔
791
    def com_license(self, value: str) -> None:
1✔
792
        self._set_locking("_com_license", value)
1✔
793

794
    def compare_licenses(self) -> None:
1✔
795
        free_licenses = set(config["free_licenses"])
1✔
796

797
        if not self.ina_license:
1✔
798
            # iNaturalist license wasn't found, call in the humans
799
            self.status = "error"
1✔
800
        elif self.ina_license not in free_licenses:
1✔
801
            # Source license is non-free, failed license review
802
            self.status = "fail"
1✔
803
        elif self.ina_license == self.com_license:
1✔
804
            # Licenses are the same, license review passes
805
            self.status = "pass"
1✔
806
        else:
807
            # Commons license doesn't match iNaturalist, update to match
808
            self.status = "pass-change"
1✔
809

810
    @property
1✔
811
    def status(self) -> str:
1✔
812
        """Checks the Commons license against the iNaturalist license
813

814
        Returns a string with the status
815
        Statuses:
816
            fail:       iNaturalist license is non-free
817
            error:      Bot could not determine
818
            pass:       Licenses match
819
            pass-change: Commons license changed to free iNaturalist license
820
        """
821
        if not self.locked:
1✔
822
            if not self._status:
1✔
823
                self.compare_licenses()
1✔
824
            for hook in status_hooks:
1✔
825
                hook(self)
1✔
826
        return self._status
1✔
827

828
    @status.setter
1✔
829
    def status(self, value):
1✔
830
        self._set_locking("_status", value)
1✔
831

832
    @status.deleter
1✔
833
    def status(self):
1✔
834
        self.status = ""
1✔
835

836
    def _file_is_old(self) -> bool:
1✔
837
        if not config.get("old_fail", False):
1✔
838
            return False
1✔
839

840
        timestamp = self.page.latest_file_info.timestamp
1✔
841
        if (datetime.datetime.now() - timestamp) > datetime.timedelta(
1✔
842
            days=config["old_fail_age"]
843
        ):
844
            return True
1✔
845
        else:
846
            return False
1✔
847

848
    @property
1✔
849
    def is_old(self) -> bool:
1✔
850
        if self._is_old is None:
1✔
851
            if self.status == "fail":
1✔
852
                self._is_old = self._file_is_old()
1✔
853
            else:
854
                self._is_old = False
1✔
855
        return self._is_old
1✔
856

857
    @is_old.setter
1✔
858
    def is_old(self, value: bool) -> None:
1✔
859
        self._set_locking("_is_old", value)
1✔
860

861
    @property
1✔
862
    def no_del(self) -> bool:
1✔
863
        if self._no_del is None:
1✔
864
            if self.status == "fail":
1✔
865
                page_templates = set(self.page.itertemplates())
1✔
866
                check_templates = {
1✔
867
                    pywikibot.Page(site, "Template:OTRS received"),
868
                    pywikibot.Page(site, "Template:Deletion template tag"),
869
                }
870
                self._no_del = not page_templates.isdisjoint(check_templates)
1✔
871
            else:
872
                self._no_del = False
1✔
873
        return self._no_del
1✔
874

875
    @no_del.setter
1✔
876
    def no_del(self, value) -> None:
1✔
877
        self._set_locking("_no_del", value)
1✔
878

879
    @property
1✔
880
    def archive(self) -> str:
1✔
881
        if not self._archive:
1✔
882
            if config.get("use_wayback") and self.status in ("pass", "pass-change"):
1!
883
                self.get_old_archive()
×
884
                if not self._archive:
×
885
                    self.save_archive()
×
886
            elif self.status == "fail" or (
1✔
887
                self.status != "error" and config.get("wayback_get", True)
888
            ):
889
                self.get_old_archive()
1✔
890
        return self._archive
1✔
891

892
    @archive.setter
1✔
893
    def archive(self, value: str) -> None:
1✔
894
        self._archive = value
1✔
895

896
    def save_archive(self) -> None:
1✔
897
        try:
×
898
            url = waybackpy.Url(str(self.photo_id), user_agent).save()
×
899
            assert url.archive_url is not None
×
900
            self.archive = url.archive_url
×
901
        except Exception as err:
×
902
            logger.warn("Failed to get archive", exc_info=err)
×
903
            self.archive = ""
×
904

905
    def get_old_archive(self) -> None:
1✔
906
        try:
×
907
            url = waybackpy.Url(str(self.photo_id), user_agent).oldest()
×
908
            assert url.archive_url is not None
×
909
            self.archive = url.archive_url
×
910
        except Exception as err:
×
911
            logger.info("Failed to get archive", exc_info=err)
×
912
            self.archive = ""
×
913
        else:
914
            if self.status == "fail":
×
915
                self.status = "fail-archive"
×
916

917
    def uploader_talk(self) -> pywikibot.page.Page:
1✔
918
        return pywikibot.Page(site, f"User talk:{self.page.oldest_file_info.user}")
1✔
919

920
    def update_review(self) -> bool:
1✔
921
        """Updates the wikitext with the review status"""
922
        logger.info(f"Status: {self.status} ({self.reason})")
1✔
923
        self.lock()
1✔
924
        code = mwph.parse(self.page.text)
1✔
925
        template = self.make_template()
1✔
926
        changed = False
1✔
927
        if self.check_has_template():
1✔
928
            # Already tagged for review, replace the existing template
929
            for review_template in code.ifilter_templates(
1✔
930
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
931
            ):
932
                code.replace(review_template, template)
1✔
933
                changed = True
1✔
934
        else:
935
            # Check for normal {{LicenseReview}} template
936
            for review_template in code.ifilter_templates(
1!
937
                matches=lambda t: re.search(r"[Ll]icense ?[Rr]eview", str(t))
938
            ):
939
                code.replace(review_template, template)
×
940
                changed = True
×
941

942
            if not changed:
1!
943
                # Not already tagged, try to put the review template under the license
944
                if self.com_license:
1✔
945
                    aliases = Aliases(self.com_license)
1✔
946
                    for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
947
                        code.insert_after(pt2, "\n" + template)
1✔
948
                        changed = True
1✔
949
                else:
950
                    for node in code.ifilter(
1!
951
                        matches=lambda n: re.search(
952
                            r"(\[\[Category:|\{\{Uncategorized)", str(n)
953
                        )
954
                    ):
955
                        code.insert_before(node, template + "\n\n")
1✔
956
                        changed = True
1✔
957
                        break
1✔
958
                    else:
959
                        code.append("\n\n" + template)
×
960
                        changed = True
×
961

962
        if not changed:
1✔
963
            logger.info("Page not changed")
1✔
964
            return False
1✔
965

966
        if self.status == "pass-change":
1✔
967
            if self.com_license:
1✔
968
                aliases = Aliases(self.com_license)
1✔
969
                for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
970
                    code.replace(pt2, ("{{%s}}" % self.ina_license))
1✔
971
            else:
972
                code.insert_before(template, ("{{%s}}" % self.ina_license))
1✔
973

974
        if self.status == "fail" and not self.no_del:
1✔
975
            code.insert(
1✔
976
                0,
977
                string.Template(
978
                    config["old_fail_tag"] if self.is_old else config["fail_tag"]
979
                ).safe_substitute(
980
                    review_license=self.ina_license,
981
                    source_url=str(self.photo_id) if self.photo_id else "",
982
                ),
983
            )
984

985
        if self.status in ["pass", "pass-change"] and config.get("tag_source"):
1✔
986
            self.add_source_tag(code)
1✔
987

988
        if self.throttle is not None:
1!
989
            self.throttle.throttle()
×
990
        try:
1✔
991
            self.save_page(str(code))
1✔
992
        except Exception as err:
×
993
            logging.exception(err)
×
994
            return False
×
995
        else:
996
            return True
1✔
997

998
    def make_template(self) -> str:
1✔
999
        """Constructs the iNaturalistreview template"""
1000
        self.lock()
1✔
1001
        if self.status == "stop":
1✔
1002
            return ""
1✔
1003
        template = string.Template(config[self.status])
1✔
1004
        text = template.safe_substitute(
1✔
1005
            status=self.status,
1006
            author=self.ina_author,
1007
            source_url=str(self.photo_id) if self.photo_id else "",
1008
            review_date=datetime.date.today().isoformat(),
1009
            reviewer=username,
1010
            review_license=self.ina_license,
1011
            upload_license=self.com_license,
1012
            reason=self.reason,
1013
            archive=self.archive,
1014
        )
1015
        return text
1✔
1016

1017
    def add_source_tag(self, code: mwph.wikicode.Wikicode) -> None:
1✔
1018
        source_tag = ""
1✔
1019
        templates = set(self.page.itertemplates())
1✔
1020
        if not self.obs_id or not config["tag_source"]:
1✔
1021
            return
1✔
1022
        if pywikibot.Page(site, "Template:INaturalist") not in templates:
1!
1023
            source_tag += "\n{{iNaturalist|%s}}" % self.obs_id.id
1✔
1024

1025
        gbif_links = [
1✔
1026
            link
1027
            for link in self.ina_data.get("outlinks", [])
1028
            if link["source"] == "GBIF"
1029
        ]
1030
        if gbif_links and pywikibot.Page(site, "Template:Gbif") not in templates:
1!
1031
            gbif_id = gbif_links[0]["url"].split("/")[-1]
1✔
1032
            source_tag += "\n{{gbif|%s}}" % gbif_id
1✔
1033

1034
        if not source_tag:
1!
1035
            return
×
1036

1037
        try:
1✔
1038
            # Place templates at the bottom of =={{int:filedesc}}==,
1039
            # after any other templates but before categories/other text
1040
            prev = code.get_sections(matches="filedesc")[0].filter_templates(
1✔
1041
                recursive=False
1042
            )[-1]
1043
        except IndexError:
1✔
1044
            # If there is no Summary section, just place after {{iNaturalistreview}}
1045
            prev = code.filter_templates(
1✔
1046
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
1047
            )[0]
1048

1049
        code.insert_after(prev, source_tag)
1✔
1050

1051
    def save_page(self, new_text: str) -> None:
1✔
1052
        """Replaces the wikitext of the specified page with new_text
1053

1054
        If the global simulate variable is true, the wikitext will be printed
1055
        instead of saved to Commons.
1056
        """
1057

1058
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
1059
            status=self.status,
1060
            review_license=self.ina_license,
1061
            version=__version__,
1062
            tag=summary_tag,
1063
        )
1064
        for hook in pre_save_hooks:
1!
1065
            hook(
×
1066
                self,
1067
                new_text=new_text,
1068
                summary=summary,
1069
            )
1070
        if not simulate:
1✔
1071
            acnutils.check_runpage(site, override=run_override)
1✔
1072
            logger.info(f"Saving {self.page.title()}")
1✔
1073
            acnutils.retry(
1✔
1074
                acnutils.save_page,
1075
                3,
1076
                text=new_text,
1077
                page=self.page,
1078
                summary=summary,
1079
                bot=False,
1080
                minor=False,
1081
            )
1082
        else:
1083
            logger.info("Saving disabled")
1✔
1084
            logger.debug(summary)
1✔
1085
            logger.debug(new_text)
1✔
1086

1087
    def fail_warning(self) -> None:
1✔
1088
        user_talk = self.uploader_talk()
1✔
1089
        message = string.Template(
1✔
1090
            config["old_fail_warn"] if self.is_old else config["fail_warn"]
1091
        ).safe_substitute(
1092
            filename=self.page.title(with_ns=True),
1093
            review_license=self.ina_license,
1094
            source_url=str(self.photo_id) if self.photo_id else "",
1095
        )
1096
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
1097
            status="fail",
1098
            review_license=self.ina_license,
1099
            version=__version__,
1100
            tag=summary_tag,
1101
        )
1102
        if not simulate:
1!
1103
            acnutils.check_runpage(site, override=run_override)
1✔
1104
            logger.info(f"Saving {user_talk.title()}")
1✔
1105
            acnutils.retry(
1✔
1106
                acnutils.save_page,
1107
                3,
1108
                text=message,
1109
                page=user_talk,
1110
                summary=summary,
1111
                bot=False,
1112
                minor=False,
1113
                mode="append",
1114
            )
1115
        else:
1116
            logger.info("Saving disabled")
×
1117
            logger.info(summary)
×
1118
            logger.info(message)
×
1119

1120
    def log_untagged_error(self) -> None:
1✔
1121
        if simulate:
×
1122
            return
×
1123
        if self.page.title() not in self.log_page.text:
×
1124
            message = string.Template(config["untagged_log_line"]).safe_substitute(
×
1125
                status=self.status,
1126
                reason=self.reason,
1127
                link=self.page.title(as_link=True, textlink=True),
1128
            )
1129
            summary = string.Template(config["untagged_log_summary"]).safe_substitute(
×
1130
                status=self.status,
1131
                reason=self.reason,
1132
                link=self.page.title(as_link=True, textlink=True),
1133
                version=__version__,
1134
                tag=summary_tag,
1135
            )
1136
            acnutils.check_runpage(site, override=run_override)
×
1137
            acnutils.retry(
×
1138
                acnutils.save_page,
1139
                3,
1140
                text=message,
1141
                page=self.log_page,
1142
                summary=summary,
1143
                bot=False,
1144
                minor=False,
1145
                mode="append",
1146
            )
1147

1148
    def remove_untagged_log(self) -> None:
1✔
1149
        """
1150
        Removes a file from the untagged error log
1151
        """
1152
        new_text, changes = re.subn(
1✔
1153
            r"^.*?{0}.*\n?".format(re.escape(str(self.page.title()))),
1154
            "",
1155
            self.log_page.text,
1156
            flags=re.MULTILINE,
1157
        )
1158
        summary = string.Template(
1✔
1159
            config["untagged_remove_log_summary"]
1160
        ).safe_substitute(
1161
            link=self.page.title(as_link=True, textlink=True),
1162
            version=__version__,
1163
            tag=summary_tag,
1164
        )
1165

1166
        if changes == 0:
1!
1167
            return
1✔
1168
        if simulate:
×
1169
            logger.debug(summary)
×
1170
            logger.debug(new_text)
×
1171
        else:
1172
            acnutils.retry(
×
1173
                acnutils.save_page,
1174
                3,
1175
                text=new_text,
1176
                page=self.log_page,
1177
                summary=summary,
1178
                bot=False,
1179
                minor=False,
1180
            )
1181

1182
    def check_untagged_log(self) -> bool:
1✔
1183
        """
1184
        Returns True if the file is on the untagged log
1185
        """
1186
        for page in self.log_page.linkedPages(namespaces=6):
1✔
1187
            if page == self.page:
1✔
1188
                return True
1✔
1189
        return False
1✔
1190

1191
    def review_file(
1✔
1192
        self, throttle: Optional[acnutils.Throttle] = None
1193
    ) -> Optional[bool]:
1194
        """Performs a license review on the input page
1195

1196
        inpage must be in the file namespace.
1197

1198
        Returns None if the file was skipped
1199
        Returns False if there was an error during review
1200
        Returns True if the file was successfully reviewed (pass or fail)
1201
        """
1202
        logger.info(f"Checking {self.page.title(as_link=True)}")
1✔
1203

1204
        acnutils.check_runpage(site, override=run_override)
1✔
1205
        if not self.check_can_run():
1✔
1206
            return None
1✔
1207

1208
        #####
1209
        try:
1✔
1210
            self.check_stop_cats()
1✔
1211
            # Get iNaturalistID
1212
            self.find_ina_id()
1✔
1213
            logger.info(f"ID found in wikitext: {self.obs_id} {self.raw_photo_id}")
1✔
1214

1215
            try:
1✔
1216
                self.find_photo_in_obs()
1✔
1217
            except ProcessingError as err:
×
1218
                if (
×
1219
                    err.reason_code in ("apierr", "notfound")
1220
                    and self.raw_photo_id
1221
                    and self.obs_id
1222
                ):
1223
                    # Observation ID probably doesn't exist.
1224
                    # If we've got a photo ID, try that.
1225
                    del self.obs_id
×
1226
                    self.find_photo_in_obs()
×
1227
                else:
1228
                    raise
×
1229
            self.compare_licenses()
1✔
1230
            self.get_ina_author()
1✔
1231
            self.archive
1✔
1232

1233
        except ProcessingError as err:
1✔
1234
            logger.info("Processing failed:", exc_info=err)
1✔
1235
            self.status = "error"
1✔
1236
            self.reason = err.reason_code
1✔
1237
        except StopReview as err:
1✔
1238
            logger.info(f"Image already reviewed, contains {err.reason}")
1✔
1239
            self.status = "stop"
1✔
1240
            return False
1✔
1241
        except (acnutils.RunpageError, KeyboardInterrupt, ConnectionError) as err:
1✔
1242
            raise err
1✔
1243
        except Exception as err:
1✔
1244
            logger.exception(err)
1✔
1245
            self.status = "error"
1✔
1246
            self.reason = repr(err)
1✔
1247

1248
        if self.status == "error" and not self.check_has_template():
1✔
1249
            # Not previously tagged, don't need to throw an error message on it.
1250
            logger.info("Skipping...")
1✔
1251
            self.log_untagged_error()
1✔
1252
            # TODO: report out failures/maintain skip list
1253

1254
            return False
1✔
1255
        reviewed = self.update_review()
1✔
1256
        if self.status == "fail" and reviewed and not self.no_del:
1✔
1257
            self.fail_warning()
1✔
1258

1259
        if reviewed:
1✔
1260
            self.remove_untagged_log()
1✔
1261

1262
        return reviewed
1✔
1263

1264

1265
def main(
1✔
1266
    page: Optional[pywikibot.page.BasePage] = None,
1267
    total: int = 0,
1268
    start: Optional[str] = None,
1269
) -> None:
1270
    """Main loop for program"""
1271
    # Enumerate starts at 0, so to get N items, count to N-1.
1272
    if page:
1✔
1273
        # When given a page, check only that page
1274
        cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1275
        cpage.review_file()
1✔
1276
    else:
1277
        # Otherwise, run automatically
1278
        # If total is 0, run continuously.
1279
        # If total is non-zero, check that many files
1280
        logger.info("Beginning loop")
1✔
1281
        i = 0
1✔
1282
        running = True
1✔
1283
        throttle = acnutils.Throttle(config.get("edit_throttle", 60))
1✔
1284
        while (not total) or (i < total):
1✔
1285
            for page in itertools.chain(
1✔
1286
                files_to_check(start), untagged_files_to_check()
1287
            ):
1288
                do_heartbeat()
1✔
1289
                try:
1✔
1290
                    cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1291
                except ValueError:
×
1292
                    continue
×
1293

1294
                if total and i >= total:
1✔
1295
                    break
1✔
1296
                i += 1
1✔
1297

1298
                try:
1✔
1299
                    check_config()
1✔
1300
                    cpage.review_file()
1✔
1301
                except (acnutils.RunpageError, RestartBot, ConnectionError) as err:
1✔
1302
                    # Blocks and runpage checks always stop
1303
                    logger.exception(err)
1✔
1304
                    raise
1✔
1305
                except Exception as err:
1✔
1306
                    if running:
1✔
1307
                        logger.exception(err)
1✔
1308
                        running = False
1✔
1309
                    else:
1310
                        # If this exception happened after running out
1311
                        # of pages or another exception, stop the bot.
1312
                        logger.exception(err)
1✔
1313
                        raise
1✔
1314
                else:
1315
                    running = True
1✔
1316
                throttle.throttle()
1✔
1317
            else:
1318
                # If the for loop drops out, there are no more pages right now
1319
                if running:
1!
1320
                    running = False
1✔
1321
                    logger.warning("Out of pages to check!")
1✔
1322
                # May need to adjust this number depending on load
1323
                else:
1324
                    time.sleep(60)
×
1325

1326

1327
config, conf_ts = get_config()
1✔
1328
init_compare_methods()
1✔
1329
if __name__ == "__main__":
1!
1330
    parser = argparse.ArgumentParser(
×
1331
        description="Review files from iNaturalist on Commons",
1332
        prog="iNaturalistReviewer",
1333
    )
1334
    run_method = parser.add_mutually_exclusive_group(required=True)
×
1335
    run_method.add_argument(
×
1336
        "--auto", action="store_true", help="run the bot automatically"
1337
    )
1338
    run_method.add_argument(
×
1339
        "--file", action="store", help="run the bot only on the specified file"
1340
    )
1341
    parser.add_argument(
×
1342
        "--total",
1343
        action="store",
1344
        type=int,
1345
        help="review no more than this number of files in automatic mode",
1346
        default=0,
1347
    )
1348
    parser.add_argument(
×
1349
        "--ignore-runpage",
1350
        action="store_true",
1351
        dest="ignore_runpage",
1352
        help="skip the runpage check for testing",
1353
    )
1354
    parser.add_argument(
×
1355
        "--start",
1356
        action="store",
1357
        help="sortkey to start iterating at",
1358
        default=None,
1359
    )
1360
    sim = parser.add_mutually_exclusive_group()
×
1361
    sim.add_argument(
×
1362
        "--simulate",
1363
        action="store_true",
1364
        help="print the output wikitext instead of saving to Commons",
1365
    )
1366
    sim.add_argument(
×
1367
        "--no-simulate",
1368
        action="store_true",
1369
        dest="no_simulate",
1370
        help="forces saving when disabled by --ignore-runpage",
1371
    )
1372
    parser.add_argument(
×
1373
        "--version", action="version", version="%(prog)s " + __version__
1374
    )
1375
    args = parser.parse_args()
×
1376

1377
    run_override = args.ignore_runpage
×
1378
    if run_override:
×
1379
        if args.no_simulate:
×
1380
            simulate = False
×
1381
        else:
1382
            simulate = True
×
1383
    else:
1384
        simulate = args.simulate
×
1385

1386
    site.login()
×
1387
    if args.auto:
×
1388
        main(total=args.total, start=args.start)
×
1389
    elif args.file and "File" in args.file:
×
1390
        main(page=pywikibot.Page(site, args.file))
×
1391
else:
1392
    run_override = False
1✔
1393
    simulate = False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc