• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

AntiCompositeNumber / iNaturalistReviewer / 12455756921

22 Dec 2024 04:07PM UTC coverage: 62.943% (-0.7%) from 63.692%
12455756921

push

github

AntiCompositeNumber
Check untagged error log directly instead of skip list

191 of 346 branches covered (55.2%)

Branch coverage included in aggregate %.

7 of 9 new or added lines in 2 files covered. (77.78%)

66 existing lines in 1 file now uncovered.

643 of 979 relevant lines covered (65.68%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.46
/src/inrbot.py
1
#!/usr/bin/env python3
2
# coding: utf-8
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
# Copyright 2023 AntiCompositeNumber
5

6
import argparse
1✔
7
import datetime
1✔
8
import hashlib
1✔
9
import itertools
1✔
10
import json
1✔
11
import logging
1✔
12
import logging.config
1✔
13
import os
1✔
14
import re
1✔
15
import string
1✔
16
import time
1✔
17
import urllib.parse
1✔
18
from hmac import compare_digest
1✔
19
from io import BytesIO
1✔
20
from pathlib import Path
1✔
21

22
import imagehash  # type: ignore
1✔
23
import mwparserfromhell as mwph  # type: ignore
1✔
24
import pywikibot  # type: ignore
1✔
25
import pywikibot.pagegenerators as pagegenerators  # type: ignore
1✔
26
import requests
1✔
27
import PIL.Image  # type: ignore
1✔
28
import waybackpy
1✔
29

30
from typing import NamedTuple, Optional, Set, Tuple, Dict, Union, cast, Callable, List
1✔
31
from typing import Any, Iterator
1✔
32

33
import acnutils
1✔
34

35
__version__ = "2.5.2"
1✔
36

37
logger = acnutils.getInitLogger("inrbot", level="VERBOSE", filename="inrbot.log")
1✔
38

39
site = pywikibot.Site("commons", "commons")
1✔
40
username = site.username()
1✔
41
summary_tag = f"(inrbot {__version__})"
1✔
42
user_agent = (
1✔
43
    f"Bot iNaturalistReviewer/{__version__} "
44
    "on Wikimedia Toolforge "
45
    f"(Contact: https://commons.wikimedia.org/wiki/User:{username}; "
46
    "https://www.inaturalist.org/people/anticompositenumber "
47
    "tools.inaturalistreviewer@tools.wmflabs.org) "
48
    f"Python requests/{requests.__version__}"
49
)
50

51
session = requests.Session()
1✔
52
session.headers.update({"user-agent": user_agent})
1✔
53
recent_bytes = {}
1✔
54
conf_ts = None
1✔
55

56
compare_methods: List[Tuple[str, Callable]] = []
1✔
57
pre_save_hooks: List[Callable] = []
1✔
58
id_hooks: List[Callable] = []
1✔
59
status_hooks: List[Callable] = []
1✔
60
lock_hooks: List[Callable] = []
1✔
61

62

63
class iNaturalistID(NamedTuple):
1✔
64
    id: str
1✔
65
    type: str
1✔
66
    url: str = ""
1✔
67

68
    def __str__(self):
1✔
69
        return f"https://www.inaturalist.org/{self.type}/{self.id}"
1✔
70

71
    def __eq__(self, other):
1✔
72
        if isinstance(other, iNaturalistID):
1!
73
            return self.id == other.id and self.type == other.type
1✔
74
        elif isinstance(other, iNaturalistImage):
×
75
            return self.id == other.id.id and self.type == other.id.type
×
76
        else:
77
            return NotImplemented
×
78

79

80
class RestartBot(RuntimeError):
1✔
81
    pass
1✔
82

83

84
class ProcessingError(Exception):
1✔
85
    def __init__(self, reason_code: str, description: str = ""):
1✔
86
        self.reason_code = reason_code
1✔
87
        self.description = description
1✔
88

89

90
class StopReview(Exception):
1✔
91
    def __init__(self, reason: str):
1✔
92
        self.reason = reason
1✔
93

94

95
def get_config() -> Tuple[dict, datetime.datetime]:
1✔
96
    """Load on-wiki configuration"""
97
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
98
    conf_json = json.loads(page.text)
1✔
99
    logger.info(f"Loaded config from {page.title(as_link=True)}")
1✔
100
    logger.debug(json.dumps(conf_json, indent=2))
1✔
101
    ts = datetime.datetime.utcnow()
1✔
102
    return conf_json, ts
1✔
103

104

105
def check_config() -> None:
1✔
106
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
107
    if conf_ts and page.latest_revision.timestamp > conf_ts:
1!
108
        raise RestartBot("Configuration has been updated, bot will restart")
×
109

110

111
def init_compare_methods() -> None:
1✔
112
    global compare_methods
113
    compare_methods = []
1✔
114
    if "sha1" in config["compare_methods"]:
1✔
115
        compare_methods.append(("sha1", compare_sha1))
1✔
116
    if "phash" in config["compare_methods"]:
1✔
117
        compare_methods.append(("phash", compare_phash))
1✔
118

119

120
def files_to_check(start: Optional[str] = None) -> Iterator[pywikibot.page.BasePage]:
1✔
121
    """Iterate list of files needing review from Commons"""
122
    category = pywikibot.Category(site, "Category:INaturalist review needed")
×
123
    do_heartbeat()
×
124
    for page in pagegenerators.CategorizedPageGenerator(
×
125
        category, namespaces=6, start=start
126
    ):
127
        yield page
×
128

129

130
def untagged_files_to_check() -> Iterator[pywikibot.page.BasePage]:
1✔
131
    if not config.get("find_untagged"):
×
132
        pages = []
×
133
    else:
134
        try:
×
135
            res = session.get(
×
136
                config["petscan_url"], params=config["untagged_petscan_query"]
137
            )
138
            res.raise_for_status()
×
139

140
            data = res.json()
×
141
            assert data["n"] == "result"
×
142
            pages = data["*"][0]["a"]["*"]
×
143
        except Exception as err:
×
144
            logger.exception(err)
×
145
            pages = []
×
146
        logger.info(f"Found {len(pages)} untagged files to check")
×
147

148
    do_heartbeat()
×
149
    for page_data in pages:
×
150
        yield pywikibot.FilePage(site, title=page_data["title"])
×
151

152

153
def gbif_to_ina_url(url: urllib.parse.ParseResult) -> str:
1✔
154
    path = url.path.split(sep="/")
1✔
155
    if path[1] != "occurrence":
1!
156
        return ""
×
157
    api_url = f"https://api.gbif.org/v1/occurrence/{path[2]}"
1✔
158
    res = session.get(api_url)
1✔
159
    res.raise_for_status()
1✔
160
    return res.json().get("references", "")
1✔
161

162

163
def parse_ina_url(raw_url: str) -> Optional[iNaturalistID]:
1✔
164
    """Parses an iNaturalist URL into an iNaturalistID named tuple"""
165
    url = urllib.parse.urlparse(raw_url)
1✔
166
    path = url.path.split(sep="/")
1✔
167
    netloc = url.netloc.lower()
1✔
168
    if len(path) == 3 and any(
1✔
169
        netloc.endswith(domain) for domain in config["inaturalist_domains"]
170
    ):
171
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
172
    elif len(path) == 4 and netloc in (
1✔
173
        "inaturalist-open-data.s3.amazonaws.com",
174
        "static.inaturalist.org",
175
    ):
176
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
177
    elif len(path) == 3 and netloc == "www.gbif.org":
1✔
178
        return parse_ina_url(gbif_to_ina_url(url))
1✔
179
    else:
180
        return None
1✔
181

182

183
class Image:
1✔
184
    def __init__(
1✔
185
        self,
186
        raw: Optional[bytes] = None,
187
        image: Optional[PIL.Image.Image] = None,
188
        sha1: str = "",
189
        phash: Optional[imagehash.ImageHash] = None,
190
    ):
191
        self._raw = raw
1✔
192
        self._image = image
1✔
193
        self._sha1 = sha1
1✔
194
        self._phash = phash
1✔
195

196
    @property
1✔
197
    def phash(self) -> imagehash.ImageHash:
1✔
198
        if not self._phash:
1!
199
            self._phash = imagehash.phash(self.image)
1✔
200
        return self._phash
1✔
201

202
    @property
1✔
203
    def image(self):
1✔
204
        raise NotImplementedError
×
205

206
    def __repr__(self) -> str:
1✔
207
        paras = ", ".join(
×
208
            f"{key}={repr(value)}" for key, value in self.__dict__.items()
209
        )
210
        return f"{type(self).__name__}({paras})"
×
211

212
    def __eq__(self, other):
1✔
213
        if isinstance(other, Image):
1!
214
            return self.id == other.id
×
215
        elif isinstance(other, iNaturalistID):
1!
216
            return self.id == other
1✔
217
        else:
218
            return NotImplemented
×
219

220

221
class iNaturalistImage(Image):
1✔
222
    _cache: Dict[iNaturalistID, str] = {}
1✔
223

224
    def __init__(self, id: iNaturalistID, **kwargs):
1✔
225
        self.id = id
1✔
226
        super().__init__(**kwargs)
1✔
227

228
    @property
1✔
229
    def raw(self) -> bytes:
1✔
230
        if not self._raw:
1!
231
            self._raw = acnutils.retry(get_ina_image, 3, photo=self.id)
1✔
232
        return cast(bytes, self._raw)
1✔
233

234
    @property
1✔
235
    def image(self) -> PIL.Image.Image:
1✔
236
        if not self._image:
1!
237
            self._image = PIL.Image.open(BytesIO(self.raw))
1✔
238
        return self._image
1✔
239

240
    @property
1✔
241
    def sha1(self) -> str:
1✔
242
        if not self._sha1:
1✔
243
            if self.id in self._cache:
1✔
244
                self._sha1 = self._cache[self.id]
1✔
245
            else:
246
                sha1sum = hashlib.sha1()
1✔
247
                sha1sum.update(self.raw)
1✔
248
                self._sha1 = sha1sum.hexdigest()
1✔
249
                self._cache[self.id] = self._sha1
1✔
250
        return self._sha1
1✔
251

252

253
class CommonsImage(Image):
1✔
254
    def __init__(self, page: pywikibot.FilePage, **kwargs):
1✔
255
        self.page = page
1✔
256
        if self.page:
1✔
257
            self.page.get(force=True)
1✔
258
        super().__init__(**kwargs)
1✔
259

260
    @property
1✔
261
    def raw(self):
1✔
262
        return NotImplemented
×
263

264
    @property
1✔
265
    def image(self) -> PIL.Image.Image:
1✔
266
        """Download orignal Commons file and open as a PIL image"""
267
        if not self._image:
1!
268
            url = self.page.get_file_url()
1✔
269
            response = session.get(url)
1✔
270
            response.raise_for_status()
1✔
271
            self._image = PIL.Image.open(BytesIO(response.content))
1✔
272
        return self._image
1✔
273

274
    @property
1✔
275
    def sha1(self) -> str:
1✔
276
        if not self._sha1:
1✔
277
            self._sha1 = self.page.latest_file_info.sha1
1✔
278
        return self._sha1
1✔
279

280

281
def compare_sha1(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
282
    logger.debug(f"Commons sha1sum:     {com_img.sha1}")
1✔
283
    logger.debug(f"iNaturalist sha1sum: {ina_img.sha1}")
1✔
284
    return compare_digest(com_img.sha1, ina_img.sha1)
1✔
285

286

287
def compare_phash(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
288
    diff = com_img.phash - ina_img.phash
1✔
289
    logger.debug(f"PHash Hamming distance: {diff}")
1✔
290
    return diff <= config.get("max_phash_dist", 4)
1✔
291

292

293
def get_ina_image(photo: iNaturalistID, final: bool = False) -> bytes:
1✔
294
    """Download original photo from iNaturalist"""
295
    if photo.url:
1!
296
        extension = photo.url.partition("?")[0].rpartition(".")[2]
1✔
297
        domain = photo.url.partition("//")[2].partition("/")[0]
1✔
298
    else:
299
        extension = "jpeg"
×
300
        domain = "inaturalist-open-data.s3.amazonaws.com"
×
301
    # TODO: Replace this hardcoded URL
302
    url = f"https://{domain}/photos/{photo.id}/original.{extension}"
1✔
303
    response = session.get(url)
1✔
304
    if response.status_code == 403 and not final:
1!
305
        return get_ina_image(photo._replace(url=url.replace("jpeg", "jpg")), final=True)
×
306
    response.raise_for_status()
1✔
307
    return response.content
1✔
308

309

310
def bytes_throttle(length: int) -> None:
1✔
311
    hour_limit = 4.5e9
×
312
    day_limit = 23.5e9
×
313
    global recent_bytes
314
    logger.debug(f"Content length: {length}")
×
315
    now = datetime.datetime.now()
×
316
    recent_bytes[datetime.datetime.now()] = length
×
317

318
    last_hour = 0
×
319
    last_day = 0
×
320
    for date, val in recent_bytes.copy().items():
×
321
        if now - date <= datetime.timedelta(hours=24):
×
322
            last_day += val
×
323
            if now - date <= datetime.timedelta(hours=1):
×
324
                last_hour += val
×
325
        else:
326
            del recent_bytes[date]
×
327

328
    logger.debug(f"Hour total: {last_hour}, day total: {last_day}")
×
329
    if last_day >= day_limit:
×
330
        logger.error(
×
331
            f"{last_day} bytes transferred in last 24h, approaching iNaturalist limits!"
332
        )
333
        sleep_time = 3600 * 12  # 12 hours
×
334
    elif last_hour >= hour_limit:
×
335
        logger.error(
×
336
            f"{last_hour} bytes transferred in last hour, "
337
            "approaching iNaturalist limits!"
338
        )
339
        sleep_time = 60 * 30  # 30 minutes
×
340
    else:
341
        return None
×
342
    logger.info(f"Sleeping for {sleep_time} seconds")
×
343
    time.sleep(sleep_time)
×
344
    return None
×
345

346

347
def do_heartbeat() -> None:
1✔
348
    """Update the timestamp on a file (if provided)
349

350
    Works with inrbot-healthcheck.sh when the HEARTBEAT_FILE environment variable is set
351
    """
352
    if os.environ.get("HEARTBEAT_FILE"):
1!
353
        Path(os.environ["HEARTBEAT_FILE"]).touch()
×
354

355

356
class Aliases:
1✔
357
    alias_cache: Dict[str, Dict[str, Union[float, Set[str]]]] = {}
1✔
358

359
    def __init__(self, title: str) -> None:
1✔
360
        self.title: str = title
1✔
361
        self._aliases: Optional[Set[str]] = None
1✔
362

363
    def get_aliases(self) -> None:
1✔
364
        canon_page = pywikibot.Page(site, f"Template:{self.title}")
1✔
365
        aliases = {
1✔
366
            page.title(with_ns=False).lower()
367
            for page in canon_page.backlinks(filter_redirects=True, namespaces=10)
368
        }
369
        aliases.add(canon_page.title(with_ns=False).lower())
1✔
370
        aliases.update(
1✔
371
            page.title(with_ns=False).lower().partition("/")[0]
372
            for page in canon_page.embeddedin(namespaces=10)
373
        )
374
        self._aliases = aliases
1✔
375

376
    @property
1✔
377
    def aliases(self):
1✔
378
        if self._aliases is None:
1✔
379
            cached = self.alias_cache.get(self.title)
1✔
380
            if cached is None or time.monotonic() - cached["last_update"] > 3600:
1✔
381
                self.get_aliases()
1✔
382
                self.alias_cache[self.title] = {
1✔
383
                    "last_update": time.monotonic(),
384
                    "aliases": self._aliases,
385
                }
386
            else:
387
                self._aliases = cached["aliases"]
1✔
388
        return self._aliases
1✔
389

390
    def is_license(self, template: mwph.nodes.Template) -> bool:
1✔
391
        if template.name.lower() in self.aliases:
1✔
392
            return True
1✔
393
        elif template.name.lower() == "self":
1!
394
            return True
×
395
        return False
1✔
396

397

398
def get_observation_from_photo(photo_id: iNaturalistID) -> iNaturalistID:
1✔
399
    assert photo_id.type == "photos"
1✔
400
    try:
1✔
401
        res = session.get(str(photo_id))
1✔
402
        res.raise_for_status()
1✔
403
    except Exception:
×
404
        raise ProcessingError("nourl", "No observation ID could be found")
×
405
    # Yes, I know I'm parsing HTML with a regex.
406
    match = re.search(r"/observations/(\d*)\"", res.text)
1✔
407
    if not match:
1!
408
        raise ProcessingError("nourl", "No observation ID could be found")
×
409
    else:
410
        return iNaturalistID(type="observations", id=match.group(1))
1✔
411

412

413
class CommonsPage:
1✔
414
    def __init__(
1✔
415
        self,
416
        page: pywikibot.FilePage,
417
        throttle: Optional[acnutils.Throttle] = None,
418
        ina_throttle: acnutils.Throttle = acnutils.Throttle(10),
419
    ) -> None:
420
        self.page = page
1✔
421
        self._com_license: Optional[str] = None
1✔
422
        self._ina_license: Optional[str] = None
1✔
423
        self._status = ""
1✔
424
        self._ina_author: Optional[str] = None
1✔
425
        self._ina_data: dict = {}
1✔
426
        self._is_old: Optional[bool] = None
1✔
427
        self._no_del: Optional[bool] = None
1✔
428
        self._archive = ""
1✔
429
        self.throttle = throttle
1✔
430
        self.ina_throttle = ina_throttle
1✔
431
        self.reason = ""
1✔
432
        self._photo_id: Optional[iNaturalistID] = None
1✔
433
        self._raw_photo_id: Optional[iNaturalistID] = None
1✔
434
        self._obs_id: Optional[iNaturalistID] = None
1✔
435
        self._locked = False
1✔
436
        self.photo_id_source = ""
1✔
437
        self.log_page = pywikibot.Page(site, config["untagged_log_page"])
1✔
438

439
    @property
1✔
440
    def locked(self) -> bool:
1✔
441
        return self._locked
1✔
442

443
    @locked.setter
1✔
444
    def locked(self, value: bool):
1✔
445
        if self._locked is False:
1!
446
            self._locked = value
1✔
447
        elif value is False:
×
448
            raise TypeError("Can not unlock parameters")
×
449

450
    def lock(self):
1✔
451
        if self.locked is False:
1✔
452
            for hook in lock_hooks:
1!
453
                hook(self)
×
454
            self.locked = True
1✔
455

456
    def _set_locking(self, attr: str, value: Any) -> None:
1✔
457
        if not self.locked:
1✔
458
            setattr(self, attr, value)
1✔
459
        else:
460
            raise TypeError(f"{attr[1:]} has already been read, and can not be changed")
1✔
461

462
    def _get_locking_str(self, attr: str, setter: Optional[Callable] = None) -> str:
1✔
463
        if getattr(self, attr) is None:
1✔
464
            if self.locked:
1!
465
                setattr(self, attr, "")
1✔
466
            elif setter is not None:
×
467
                setter()
×
468
            else:
469
                raise AttributeError(attr[1:])
×
470
        return getattr(self, attr)
1✔
471

472
    def check_can_run(self) -> bool:
1✔
473
        """Determinies if the bot should run on this page and returns a bool."""
474
        page = self.page
1✔
475
        if (
1✔
476
            # Skip files that are still reported as an error
477
            (not self.check_has_template() and self.check_untagged_log())
478
            # Skip if the bot can't edit the page, due to permissions or {{bots}}
479
            or (not page.has_permission("edit"))
480
            or (not page.botMayEdit())
481
            # Skip if there's already a review template with parameters
482
            or (re.search(r"{{[iI][nN]aturalist[rR]eview *?\|.*?}}", page.text))
483
        ):
484
            return False
1✔
485
        else:
486
            return True
1✔
487

488
    def check_has_template(self) -> bool:
1✔
489
        return bool(re.search(r"{{[iI][nN]aturalist[rR]eview", self.page.text))
1✔
490

491
    def check_stop_cats(self) -> None:
1✔
492
        stop_cats = {
1✔
493
            pywikibot.Category(site, title) for title in config["stop_categories"]
494
        }
495
        page_cats = set(self.page.categories())
1✔
496
        page_stop = stop_cats & page_cats
1✔
497
        if page_stop:
1✔
498
            raise StopReview(str(page_stop))
1✔
499

500
    def find_ina_id(self) -> None:
1✔
501
        """Returns an iNaturalistID tuple from wikitext"""
502
        photos = []
1✔
503
        observations = []
1✔
504

505
        for url in self.page.extlinks():
1✔
506
            url_id = parse_ina_url(url)
1✔
507
            if (
1✔
508
                url_id is None
509
                or re.search(r"[A-z]", url_id.id)
510
                or url_id in photos
511
                or url_id in observations
512
            ):
513
                continue  # pragma: no cover
514
            elif url_id.type == "observations":
1✔
515
                observations.append(url_id)
1✔
516
            elif url_id.type == "photos":
1!
517
                photos.append(url_id)
1✔
518

519
        for hook in id_hooks:
1✔
520
            hook_id = hook(self, observations=observations.copy(), photos=photos.copy())
1✔
521
            if hook_id is None or re.search(r"[A-z]", hook_id.id):
1✔
522
                continue  # pragma: no cover
523
            elif hook_id.type == "observations":
1✔
524
                observations.insert(0, hook_id)
1✔
525
            elif hook_id.type == "photos":
1!
526
                photos.insert(0, hook_id)
1✔
527
                observations = []
1✔
528

529
        if photos and observations:
1✔
530
            self.obs_id = observations[0]
1✔
531
            self.raw_photo_id = photos[0]
1✔
532
        elif observations:
1✔
533
            self.obs_id = observations[0]
1✔
534
            self.raw_photo_id = None
1✔
535
        elif photos:
1✔
536
            self.obs_id = None
1✔
537
            self.raw_photo_id = photos[0]
1✔
538
        else:
539
            raise ProcessingError("nourl", "No observation ID could be found")
1✔
540

541
    @property
1✔
542
    def photo_id(self) -> Optional[iNaturalistID]:
1✔
543
        return self._photo_id
1✔
544

545
    @photo_id.setter
1✔
546
    def photo_id(self, value: iNaturalistID):
1✔
547
        self._set_locking("_photo_id", value)
1✔
548

549
    @property
1✔
550
    def raw_photo_id(self) -> Optional[iNaturalistID]:
1✔
551
        return self._raw_photo_id
1✔
552

553
    @raw_photo_id.setter
1✔
554
    def raw_photo_id(self, value: iNaturalistID):
1✔
555
        self._raw_photo_id = value
1✔
556

557
    @property
1✔
558
    def obs_id(self) -> Optional[iNaturalistID]:
1✔
559
        if not self._obs_id and not self.locked:
1✔
560
            if self.raw_photo_id:
1✔
561
                self._obs_id = get_observation_from_photo(self.raw_photo_id)
1✔
562
        return self._obs_id
1✔
563

564
    @obs_id.setter
1✔
565
    def obs_id(self, value: iNaturalistID) -> None:
1✔
566
        self._set_locking("_obs_id", value)
1✔
567

568
    @obs_id.deleter
1✔
569
    def obs_id(self) -> None:
1✔
570
        if not self.locked:
1!
571
            self._obs_id = None
1✔
572
            del self.ina_data
1✔
573
        else:
574
            raise TypeError
×
575

576
    @property
1✔
577
    def ina_data(self) -> dict:
1✔
578
        """Make API request to iNaturalist from an ID and ID type
579

580
        Returns a dict of the API result
581
        """
582
        if not self._ina_data:
1✔
583
            assert self.obs_id
1✔
584
            if self.obs_id.type == "observations":
1✔
585
                url = f"https://api.inaturalist.org/v1/observations/{self.obs_id.id}"
1✔
586
            else:
587
                raise ProcessingError("apierr", "iNaturalist ID is wrong type")
1✔
588

589
            if self.throttle:
1!
590
                self.throttle.throttle()
×
591
            try:
1✔
592
                response = session.get(url, headers={"Accept": "application/json"})
1✔
593
                response.raise_for_status()
1✔
594
                response_json = response.json()
1✔
595
            except (ValueError, requests.exceptions.HTTPError) as err:
1✔
596
                raise ProcessingError("apierr", "iNaturalist API error") from err
1✔
597
            else:
598
                if response_json.get("total_results") != 1:
1✔
599
                    logger.debug(response_json)
1✔
600
                    raise ProcessingError("apierr", f"iNaturalist API error in {url}")
1✔
601
                res = response_json.get("results", [None])[0]
1✔
602
                if not res:
1✔
603
                    raise ProcessingError(
1✔
604
                        "apierr", f"No data recieved from iNaturalist in {url}"
605
                    )
606
                self._ina_data = res
1✔
607
        return self._ina_data
1✔
608

609
    @ina_data.deleter
1✔
610
    def ina_data(self) -> None:
1✔
611
        self._ina_data = {}
1✔
612

613
    def get_ina_license(self) -> None:
1✔
614
        """Find the image license in the iNaturalist API response
615

616
        If a license is found, the Commons template name is returned.
617
        If no license is found, an empty string is returned.
618

619
        The API does not return CC version numbers, but the website has 4.0 links.
620
        CC 4.0 licenses are assumed.
621
        """
622
        assert self.photo_id
1✔
623
        licenses = config["ina_licenses"]
1✔
624
        photos: list = self.ina_data.get("photos", [])
1✔
625
        for photo_data in photos:
1✔
626
            if str(photo_data.get("id")) == self.photo_id.id:
1✔
627
                license_code = photo_data.get("license_code", "null")
1✔
628
                break
1✔
629
        else:
630
            raise ProcessingError("inatlicense", "No iNaturalist license found")
1✔
631

632
        if not license_code:
1!
633
            license_code = "null"
×
634

635
        try:
1✔
636
            self.ina_license = licenses[license_code]
1✔
637
        except KeyError as e:
×
638
            raise ProcessingError("inatlicense", "No iNaturalist license found") from e
×
639
        logger.info(f"iNaturalist License: {self.ina_license}")
1✔
640

641
    @property
1✔
642
    def ina_license(self) -> str:
1✔
643
        return self._get_locking_str("_ina_license", self.get_ina_license)
1✔
644

645
    @ina_license.setter
1✔
646
    def ina_license(self, value: str) -> None:
1✔
647
        self._set_locking("_ina_license", value)
1✔
648

649
    def find_photo_in_obs(self, recurse: bool = True) -> None:
1✔
650
        """Find the matching image in an iNaturalist observation
651

652
        Returns an iNaturalistID named tuple with the photo ID.
653
        """
654
        images = [
1✔
655
            iNaturalistImage(
656
                id=iNaturalistID(type="photos", id=str(photo["id"]), url=photo["url"])
657
            )
658
            for photo in self.ina_data["photos"]
659
        ]
660
        if len(images) < 1:
1✔
661
            raise ProcessingError("notfound", "No photos in observation")
1✔
662
        elif self.raw_photo_id:
1✔
663
            # False sorts before True, otherwise remains in original order
664
            # This will sort the matching photo before other photos in the obs,
665
            # but will still check those other images if no match.
666
            images.sort(key=lambda image: self.raw_photo_id != image)
1✔
667

668
        commons_image = CommonsImage(page=self.page)
1✔
669

670
        for comp_method, comp_func in compare_methods:
1✔
671
            logger.info(f"Comparing photos using {comp_method}")
1✔
672
            for image in images:
1✔
673
                logger.debug(f"Comparing {str(image.id)}")
1✔
674
                try:
1✔
675
                    res = comp_func(com_img=commons_image, ina_img=image)
1✔
676
                except Exception:
×
677
                    res = False
×
678
                if res:
1✔
679
                    logger.info(f"Match found: {str(image.id)}")
1✔
680
                    self.reason = comp_method
1✔
681
                    self.photo_id = image.id
1✔
682
                    return
1✔
683
                elif self.throttle:
1!
684
                    self.throttle.throttle()
×
685
        if self.raw_photo_id and self.raw_photo_id not in images and recurse:
1✔
686
            del self.obs_id
1✔
687
            self.find_photo_in_obs(recurse=False)
1✔
688
        else:
689
            raise ProcessingError("notmatching", "No matching photos found")
1✔
690

691
    def get_ina_author(self):
1✔
692
        self.ina_author = self.ina_data.get("user", {}).get("login", "")
1✔
693
        logger.info(f"Author: {self.ina_author}")
1✔
694

695
    @property
1✔
696
    def ina_author(self) -> str:
1✔
697
        """Find the image author in the iNaturalist API response
698

699
        Returns a string with the username of the iNaturalist contributor
700
        """
701
        return self._get_locking_str("_ina_author", self.get_ina_author)
1✔
702

703
    @ina_author.setter
1✔
704
    def ina_author(self, value: str) -> None:
1✔
705
        self._set_locking("_ina_author", value)
1✔
706

707
    def get_com_license(self):
1✔
708
        """Find the license template currently used on the Commons page
709

710
        Returns the first license template used on the page. If no templates
711
        are found, return an empty string.
712
        """
713

714
        category = pywikibot.Category(site, "Category:Primary license tags (flat list)")
1✔
715
        templates = set(self.page.itertemplates())
1✔
716
        license_tags = set(category.members(namespaces=10))
1✔
717

718
        for template in templates:
1✔
719
            if template in license_tags:
1✔
720
                self._com_license = template.title(with_ns=False)
1✔
721
                break
1✔
722
        else:
723
            logger.info("No Commons license found!")
1✔
724
            self._com_license = ""
1✔
725
        logger.info(f"Commons License: {self.com_license}")
1✔
726

727
    @property
1✔
728
    def com_license(self) -> str:
1✔
729
        return self._get_locking_str("_com_license", self.get_com_license)
1✔
730

731
    @com_license.setter
1✔
732
    def com_license(self, value: str) -> None:
1✔
733
        self._set_locking("_com_license", value)
1✔
734

735
    def compare_licenses(self) -> None:
1✔
736
        free_licenses = set(config["free_licenses"])
1✔
737

738
        if not self.ina_license:
1✔
739
            # iNaturalist license wasn't found, call in the humans
740
            self.status = "error"
1✔
741
        elif self.ina_license not in free_licenses:
1✔
742
            # Source license is non-free, failed license review
743
            self.status = "fail"
1✔
744
        elif self.ina_license == self.com_license:
1✔
745
            # Licenses are the same, license review passes
746
            self.status = "pass"
1✔
747
        else:
748
            # Commons license doesn't match iNaturalist, update to match
749
            self.status = "pass-change"
1✔
750

751
    @property
1✔
752
    def status(self) -> str:
1✔
753
        """Checks the Commons license against the iNaturalist license
754

755
        Returns a string with the status
756
        Statuses:
757
            fail:       iNaturalist license is non-free
758
            error:      Bot could not determine
759
            pass:       Licenses match
760
            pass-change: Commons license changed to free iNaturalist license
761
        """
762
        if not self.locked:
1✔
763
            if not self._status:
1✔
764
                self.compare_licenses()
1✔
765
            for hook in status_hooks:
1✔
766
                hook(self)
1✔
767
        return self._status
1✔
768

769
    @status.setter
1✔
770
    def status(self, value):
1✔
771
        self._set_locking("_status", value)
1✔
772

773
    @status.deleter
1✔
774
    def status(self):
1✔
775
        self.status = ""
1✔
776

777
    def _file_is_old(self) -> bool:
1✔
778
        if not config.get("old_fail", False):
1✔
779
            return False
1✔
780

781
        timestamp = self.page.latest_file_info.timestamp
1✔
782
        if (datetime.datetime.now() - timestamp) > datetime.timedelta(
1✔
783
            days=config["old_fail_age"]
784
        ):
785
            return True
1✔
786
        else:
787
            return False
1✔
788

789
    @property
1✔
790
    def is_old(self) -> bool:
1✔
791
        if self._is_old is None:
1✔
792
            if self.status == "fail":
1✔
793
                self._is_old = self._file_is_old()
1✔
794
            else:
795
                self._is_old = False
1✔
796
        return self._is_old
1✔
797

798
    @is_old.setter
1✔
799
    def is_old(self, value: bool) -> None:
1✔
800
        self._set_locking("_is_old", value)
1✔
801

802
    @property
1✔
803
    def no_del(self) -> bool:
1✔
804
        if self._no_del is None:
1✔
805
            if self.status == "fail":
1✔
806
                page_templates = set(self.page.itertemplates())
1✔
807
                check_templates = {
1✔
808
                    pywikibot.Page(site, "Template:OTRS received"),
809
                    pywikibot.Page(site, "Template:Deletion template tag"),
810
                }
811
                self._no_del = not page_templates.isdisjoint(check_templates)
1✔
812
            else:
813
                self._no_del = False
1✔
814
        return self._no_del
1✔
815

816
    @no_del.setter
1✔
817
    def no_del(self, value) -> None:
1✔
818
        self._set_locking("_no_del", value)
1✔
819

820
    @property
1✔
821
    def archive(self) -> str:
1✔
822
        if not self._archive:
1✔
823
            if config.get("use_wayback") and self.status in ("pass", "pass-change"):
1!
824
                self.get_old_archive()
×
825
                if not self._archive:
×
826
                    self.save_archive()
×
827
            elif self.status == "fail" or (
1✔
828
                self.status != "error" and config.get("wayback_get", True)
829
            ):
830
                self.get_old_archive()
1✔
831
        return self._archive
1✔
832

833
    @archive.setter
1✔
834
    def archive(self, value: str) -> None:
1✔
835
        self._archive = value
1✔
836

837
    def save_archive(self) -> None:
1✔
838
        try:
×
839
            url = waybackpy.Url(str(self.photo_id), user_agent).save()
×
840
            assert url.archive_url is not None
×
841
            self.archive = url.archive_url
×
842
        except Exception as err:
×
843
            logger.warn("Failed to get archive", exc_info=err)
×
844
            self.archive = ""
×
845

846
    def get_old_archive(self) -> None:
1✔
847
        try:
×
848
            url = waybackpy.Url(str(self.photo_id), user_agent).oldest()
×
849
            assert url.archive_url is not None
×
850
            self.archive = url.archive_url
×
851
        except Exception as err:
×
852
            logger.info("Failed to get archive", exc_info=err)
×
853
            self.archive = ""
×
854
        else:
855
            if self.status == "fail":
×
856
                self.status = "fail-archive"
×
857

858
    def uploader_talk(self) -> pywikibot.page.Page:
1✔
859
        return pywikibot.Page(site, f"User talk:{self.page.oldest_file_info.user}")
1✔
860

861
    def update_review(self) -> bool:
1✔
862
        """Updates the wikitext with the review status"""
863
        logger.info(f"Status: {self.status} ({self.reason})")
1✔
864
        self.lock()
1✔
865
        code = mwph.parse(self.page.text)
1✔
866
        template = self.make_template()
1✔
867
        changed = False
1✔
868
        if self.check_has_template():
1✔
869
            # Already tagged for review, replace the existing template
870
            for review_template in code.ifilter_templates(
1✔
871
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
872
            ):
873
                code.replace(review_template, template)
1✔
874
                changed = True
1✔
875
        else:
876
            # Check for normal {{LicenseReview}} template
877
            for review_template in code.ifilter_templates(
1!
878
                matches=lambda t: re.search(r"[Ll]icense ?[Rr]eview", str(t))
879
            ):
880
                code.replace(review_template, template)
×
881
                changed = True
×
882

883
            if not changed:
1!
884
                # Not already tagged, try to put the review template under the license
885
                if self.com_license:
1✔
886
                    aliases = Aliases(self.com_license)
1✔
887
                    for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
888
                        code.insert_after(pt2, "\n" + template)
1✔
889
                        changed = True
1✔
890
                else:
891
                    for node in code.ifilter(
1!
892
                        matches=lambda n: re.search(
893
                            r"(\[\[Category:|\{\{Uncategorized)", str(n)
894
                        )
895
                    ):
896
                        code.insert_before(node, template + "\n\n")
1✔
897
                        changed = True
1✔
898
                        break
1✔
899
                    else:
900
                        code.append("\n\n" + template)
×
901
                        changed = True
×
902

903
        if not changed:
1✔
904
            logger.info("Page not changed")
1✔
905
            return False
1✔
906

907
        if self.status == "pass-change":
1✔
908
            if self.com_license:
1✔
909
                aliases = Aliases(self.com_license)
1✔
910
                for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
911
                    code.replace(pt2, ("{{%s}}" % self.ina_license))
1✔
912
            else:
913
                code.insert_before(template, ("{{%s}}" % self.ina_license))
1✔
914

915
        if self.status == "fail" and not self.no_del:
1✔
916
            code.insert(
1✔
917
                0,
918
                string.Template(
919
                    config["old_fail_tag"] if self.is_old else config["fail_tag"]
920
                ).safe_substitute(
921
                    review_license=self.ina_license,
922
                    source_url=str(self.photo_id) if self.photo_id else "",
923
                ),
924
            )
925

926
        if self.status in ["pass", "pass-change"] and config.get("tag_source"):
1✔
927
            self.add_source_tag(code)
1✔
928

929
        if self.throttle is not None:
1!
930
            self.throttle.throttle()
×
931
        try:
1✔
932
            self.save_page(str(code))
1✔
933
        except Exception as err:
×
934
            logging.exception(err)
×
935
            return False
×
936
        else:
937
            return True
1✔
938

939
    def make_template(self) -> str:
1✔
940
        """Constructs the iNaturalistreview template"""
941
        self.lock()
1✔
942
        if self.status == "stop":
1✔
943
            return ""
1✔
944
        template = string.Template(config[self.status])
1✔
945
        text = template.safe_substitute(
1✔
946
            status=self.status,
947
            author=self.ina_author,
948
            source_url=str(self.photo_id) if self.photo_id else "",
949
            review_date=datetime.date.today().isoformat(),
950
            reviewer=username,
951
            review_license=self.ina_license,
952
            upload_license=self.com_license,
953
            reason=self.reason,
954
            archive=self.archive,
955
        )
956
        return text
1✔
957

958
    def add_source_tag(self, code: mwph.wikicode.Wikicode) -> None:
1✔
959
        source_tag = ""
1✔
960
        templates = set(self.page.itertemplates())
1✔
961
        if not self.obs_id or not config["tag_source"]:
1✔
962
            return
1✔
963
        if pywikibot.Page(site, "Template:INaturalist") not in templates:
1!
964
            source_tag += "\n{{iNaturalist|%s}}" % self.obs_id.id
1✔
965

966
        gbif_links = [
1✔
967
            link
968
            for link in self.ina_data.get("outlinks", [])
969
            if link["source"] == "GBIF"
970
        ]
971
        if gbif_links and pywikibot.Page(site, "Template:Gbif") not in templates:
1!
972
            gbif_id = gbif_links[0]["url"].split("/")[-1]
1✔
973
            source_tag += "\n{{gbif|%s}}" % gbif_id
1✔
974

975
        if not source_tag:
1!
976
            return
×
977

978
        try:
1✔
979
            # Place templates at the bottom of =={{int:filedesc}}==,
980
            # after any other templates but before categories/other text
981
            prev = code.get_sections(matches="filedesc")[0].filter_templates(
1✔
982
                recursive=False
983
            )[-1]
984
        except IndexError:
1✔
985
            # If there is no Summary section, just place after {{iNaturalistreview}}
986
            prev = code.filter_templates(
1✔
987
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
988
            )[0]
989

990
        code.insert_after(prev, source_tag)
1✔
991

992
    def save_page(self, new_text: str) -> None:
1✔
993
        """Replaces the wikitext of the specified page with new_text
994

995
        If the global simulate variable is true, the wikitext will be printed
996
        instead of saved to Commons.
997
        """
998

999
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
1000
            status=self.status,
1001
            review_license=self.ina_license,
1002
            version=__version__,
1003
            tag=summary_tag,
1004
        )
1005
        for hook in pre_save_hooks:
1!
1006
            hook(
×
1007
                self,
1008
                new_text=new_text,
1009
                summary=summary,
1010
            )
1011
        if not simulate:
1✔
1012
            acnutils.check_runpage(site, override=run_override)
1✔
1013
            logger.info(f"Saving {self.page.title()}")
1✔
1014
            acnutils.retry(
1✔
1015
                acnutils.save_page,
1016
                3,
1017
                text=new_text,
1018
                page=self.page,
1019
                summary=summary,
1020
                bot=False,
1021
                minor=False,
1022
            )
1023
        else:
1024
            logger.info("Saving disabled")
1✔
1025
            logger.debug(summary)
1✔
1026
            logger.debug(new_text)
1✔
1027

1028
    def fail_warning(self) -> None:
1✔
1029
        user_talk = self.uploader_talk()
1✔
1030
        message = string.Template(
1✔
1031
            config["old_fail_warn"] if self.is_old else config["fail_warn"]
1032
        ).safe_substitute(
1033
            filename=self.page.title(with_ns=True),
1034
            review_license=self.ina_license,
1035
            source_url=str(self.photo_id) if self.photo_id else "",
1036
        )
1037
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
1038
            status="fail",
1039
            review_license=self.ina_license,
1040
            version=__version__,
1041
            tag=summary_tag,
1042
        )
1043
        if not simulate:
1!
1044
            acnutils.check_runpage(site, override=run_override)
1✔
1045
            logger.info(f"Saving {user_talk.title()}")
1✔
1046
            acnutils.retry(
1✔
1047
                acnutils.save_page,
1048
                3,
1049
                text=message,
1050
                page=user_talk,
1051
                summary=summary,
1052
                bot=False,
1053
                minor=False,
1054
                mode="append",
1055
            )
1056
        else:
1057
            logger.info("Saving disabled")
×
1058
            logger.info(summary)
×
1059
            logger.info(message)
×
1060

1061
    def log_untagged_error(self) -> None:
1✔
1062
        if simulate:
×
1063
            return
×
NEW
1064
        if self.page.title() not in self.log_page.text:
×
1065
            message = string.Template(config["untagged_log_line"]).safe_substitute(
×
1066
                status=self.status,
1067
                reason=self.reason,
1068
                link=self.page.title(as_link=True, textlink=True),
1069
            )
1070
            summary = string.Template(config["untagged_log_summary"]).safe_substitute(
×
1071
                status=self.status,
1072
                reason=self.reason,
1073
                link=self.page.title(as_link=True, textlink=True),
1074
                version=__version__,
1075
                tag=summary_tag,
1076
            )
1077
            acnutils.check_runpage(site, override=run_override)
×
1078
            acnutils.retry(
×
1079
                acnutils.save_page,
1080
                3,
1081
                text=message,
1082
                page=self.log_page,
1083
                summary=summary,
1084
                bot=False,
1085
                minor=False,
1086
                mode="append",
1087
            )
1088

1089
    def remove_untagged_log(self) -> None:
1✔
1090
        """
1091
        Removes a file from the untagged error log
1092
        """
1093
        new_text, changes = re.subn(
1✔
1094
            r"^.*?{0}.*\n?".format(re.escape(str(self.page.title()))),
1095
            "",
1096
            self.log_page.text,
1097
            flags=re.MULTILINE,
1098
        )
1099
        summary = string.Template(
1✔
1100
            config["untagged_remove_log_summary"]
1101
        ).safe_substitute(
1102
            link=self.page.title(as_link=True, textlink=True),
1103
            version=__version__,
1104
            tag=summary_tag,
1105
        )
1106

1107
        if changes == 0:
1!
1108
            return
1✔
1109
        if simulate:
×
1110
            logger.debug(summary)
×
1111
            logger.debug(new_text)
×
1112
        else:
1113
            acnutils.retry(
×
1114
                acnutils.save_page,
1115
                3,
1116
                text=new_text,
1117
                page=self.log_page,
1118
                summary=summary,
1119
                bot=False,
1120
                minor=False,
1121
            )
1122

1123
    def check_untagged_log(self) -> bool:
1✔
1124
        """
1125
        Returns True if the file is on the untagged log
1126
        """
1127
        for page in self.log_page.linkedPages(namespaces=6):
1✔
1128
            if page == self.page:
1!
1129
                return True
1✔
1130
        return False
1✔
1131

1132
    def review_file(
1✔
1133
        self, throttle: Optional[acnutils.Throttle] = None
1134
    ) -> Optional[bool]:
1135
        """Performs a license review on the input page
1136

1137
        inpage must be in the file namespace.
1138

1139
        Returns None if the file was skipped
1140
        Returns False if there was an error during review
1141
        Returns True if the file was successfully reviewed (pass or fail)
1142
        """
1143
        logger.info(f"Checking {self.page.title(as_link=True)}")
1✔
1144

1145
        acnutils.check_runpage(site, override=run_override)
1✔
1146
        if not self.check_can_run():
1✔
1147
            return None
1✔
1148

1149
        #####
1150
        try:
1✔
1151
            self.check_stop_cats()
1✔
1152
            # Get iNaturalistID
1153
            self.find_ina_id()
1✔
1154
            logger.info(f"ID found in wikitext: {self.obs_id} {self.raw_photo_id}")
1✔
1155

1156
            try:
1✔
1157
                self.find_photo_in_obs()
1✔
1158
            except ProcessingError as err:
×
1159
                if (
×
1160
                    err.reason_code in ("apierr", "notfound")
1161
                    and self.raw_photo_id
1162
                    and self.obs_id
1163
                ):
1164
                    # Observation ID probably doesn't exist.
1165
                    # If we've got a photo ID, try that.
1166
                    del self.obs_id
×
1167
                    self.find_photo_in_obs()
×
1168
                else:
1169
                    raise
×
1170
            self.compare_licenses()
1✔
1171
            self.get_ina_author()
1✔
1172
            self.archive
1✔
1173

1174
        except ProcessingError as err:
1✔
1175
            logger.info("Processing failed:", exc_info=err)
1✔
1176
            self.status = "error"
1✔
1177
            self.reason = err.reason_code
1✔
1178
        except StopReview as err:
1✔
1179
            logger.info(f"Image already reviewed, contains {err.reason}")
1✔
1180
            self.status = "stop"
1✔
1181
        except (acnutils.RunpageError, KeyboardInterrupt, ConnectionError) as err:
1✔
1182
            raise err
1✔
1183
        except Exception as err:
1✔
1184
            logger.exception(err)
1✔
1185
            self.status = "error"
1✔
1186
            self.reason = repr(err)
1✔
1187

1188
        if self.status == "error" and not self.check_has_template():
1✔
1189
            # Not previously tagged, don't need to throw an error message on it.
1190
            logger.info("Skipping...")
1✔
1191
            self.log_untagged_error()
1✔
1192
            # TODO: report out failures/maintain skip list
1193

1194
            return False
1✔
1195
        reviewed = self.update_review()
1✔
1196
        if self.status == "fail" and reviewed and not self.no_del:
1✔
1197
            self.fail_warning()
1✔
1198

1199
        if reviewed:
1✔
1200
            self.remove_untagged_log()
1✔
1201

1202
        return reviewed
1✔
1203

1204

1205
def main(
1✔
1206
    page: Optional[pywikibot.page.BasePage] = None,
1207
    total: int = 0,
1208
    start: Optional[str] = None,
1209
) -> None:
1210
    """Main loop for program"""
1211
    # Enumerate starts at 0, so to get N items, count to N-1.
1212
    if page:
1✔
1213
        # When given a page, check only that page
1214
        cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1215
        cpage.review_file()
1✔
1216
    else:
1217
        # Otherwise, run automatically
1218
        # If total is 0, run continuously.
1219
        # If total is non-zero, check that many files
1220
        logger.info("Beginning loop")
1✔
1221
        i = 0
1✔
1222
        running = True
1✔
1223
        throttle = acnutils.Throttle(config.get("edit_throttle", 60))
1✔
1224
        while (not total) or (i < total):
1✔
1225
            for page in itertools.chain(
1✔
1226
                files_to_check(start), untagged_files_to_check()
1227
            ):
1228
                do_heartbeat()
1✔
1229
                try:
1✔
1230
                    cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1231
                except ValueError:
×
1232
                    continue
×
1233

1234
                if total and i >= total:
1✔
1235
                    break
1✔
1236
                i += 1
1✔
1237

1238
                try:
1✔
1239
                    check_config()
1✔
1240
                    cpage.review_file()
1✔
1241
                except (acnutils.RunpageError, RestartBot, ConnectionError) as err:
1✔
1242
                    # Blocks and runpage checks always stop
1243
                    logger.exception(err)
1✔
1244
                    raise
1✔
1245
                except Exception as err:
1✔
1246
                    if running:
1✔
1247
                        logger.exception(err)
1✔
1248
                        running = False
1✔
1249
                    else:
1250
                        # If this exception happened after running out
1251
                        # of pages or another exception, stop the bot.
1252
                        logger.exception(err)
1✔
1253
                        raise
1✔
1254
                else:
1255
                    running = True
1✔
1256
                throttle.throttle()
1✔
1257
            else:
1258
                # If the for loop drops out, there are no more pages right now
1259
                if running:
1!
1260
                    running = False
1✔
1261
                    logger.warning("Out of pages to check!")
1✔
1262
                # May need to adjust this number depending on load
1263
                else:
1264
                    time.sleep(60)
×
1265

1266

1267
config, conf_ts = get_config()
1✔
1268
init_compare_methods()
1✔
1269
if __name__ == "__main__":
1!
1270
    parser = argparse.ArgumentParser(
×
1271
        description="Review files from iNaturalist on Commons",
1272
        prog="iNaturalistReviewer",
1273
    )
1274
    run_method = parser.add_mutually_exclusive_group(required=True)
×
1275
    run_method.add_argument(
×
1276
        "--auto", action="store_true", help="run the bot automatically"
1277
    )
1278
    run_method.add_argument(
×
1279
        "--file", action="store", help="run the bot only on the specified file"
1280
    )
1281
    parser.add_argument(
×
1282
        "--total",
1283
        action="store",
1284
        type=int,
1285
        help="review no more than this number of files in automatic mode",
1286
        default=0,
1287
    )
1288
    parser.add_argument(
×
1289
        "--ignore-runpage",
1290
        action="store_true",
1291
        dest="ignore_runpage",
1292
        help="skip the runpage check for testing",
1293
    )
1294
    parser.add_argument(
×
1295
        "--start",
1296
        action="store",
1297
        help="sortkey to start iterating at",
1298
        default=None,
1299
    )
1300
    sim = parser.add_mutually_exclusive_group()
×
1301
    sim.add_argument(
×
1302
        "--simulate",
1303
        action="store_true",
1304
        help="print the output wikitext instead of saving to Commons",
1305
    )
1306
    sim.add_argument(
×
1307
        "--no-simulate",
1308
        action="store_true",
1309
        dest="no_simulate",
1310
        help="forces saving when disabled by --ignore-runpage",
1311
    )
1312
    parser.add_argument(
×
1313
        "--version", action="version", version="%(prog)s " + __version__
1314
    )
1315
    args = parser.parse_args()
×
1316

1317
    run_override = args.ignore_runpage
×
1318
    if run_override:
×
1319
        if args.no_simulate:
×
1320
            simulate = False
×
1321
        else:
1322
            simulate = True
×
1323
    else:
1324
        simulate = args.simulate
×
1325

1326
    site.login()
×
1327
    if args.auto:
×
1328
        main(total=args.total, start=args.start)
×
1329
    elif args.file and "File" in args.file:
×
1330
        main(page=pywikibot.Page(site, args.file))
×
1331
else:
1332
    run_override = False
1✔
1333
    simulate = False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc