• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

AntiCompositeNumber / iNaturalistReviewer / 9424093570

07 Jun 2024 11:04PM UTC coverage: 65.64% (-0.04%) from 65.68%
9424093570

push

github

web-flow
Replace manual k8s config with toolforge-jobs (#309)

Also adds a health check script

298 of 466 branches covered (63.95%)

Branch coverage included in aggregate %.

6 of 9 new or added lines in 1 file covered. (66.67%)

1 existing line in 1 file now uncovered.

640 of 963 relevant lines covered (66.46%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.14
/src/inrbot.py
1
#!/usr/bin/env python3
2
# coding: utf-8
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
# Copyright 2023 AntiCompositeNumber
5

6
import argparse
1✔
7
import datetime
1✔
8
import hashlib
1✔
9
import itertools
1✔
10
import json
1✔
11
import logging
1✔
12
import logging.config
1✔
13
import os
1✔
14
import re
1✔
15
import string
1✔
16
import time
1✔
17
import urllib.parse
1✔
18
from hmac import compare_digest
1✔
19
from io import BytesIO
1✔
20
from pathlib import Path
1✔
21

22
import imagehash  # type: ignore
1✔
23
import mwparserfromhell as mwph  # type: ignore
1✔
24
import pywikibot  # type: ignore
1✔
25
import pywikibot.pagegenerators as pagegenerators  # type: ignore
1✔
26
import requests
1✔
27
import PIL.Image  # type: ignore
1✔
28
import waybackpy
1✔
29

30
from typing import NamedTuple, Optional, Set, Tuple, Dict, Union, cast, Callable, List
1✔
31
from typing import Any, Iterator
1✔
32

33
import acnutils
1✔
34

35
__version__ = "2.5.1"
1✔
36

37
logger = acnutils.getInitLogger("inrbot", level="VERBOSE", filename="inrbot.log")
1✔
38

39
site = pywikibot.Site("commons", "commons")
1✔
40
username = site.username()
1✔
41
skip: Set[str] = set()
1✔
42
summary_tag = f"(inrbot {__version__})"
1✔
43
user_agent = (
1✔
44
    f"Bot iNaturalistReviewer/{__version__} "
45
    "on Wikimedia Toolforge "
46
    f"(Contact: https://commons.wikimedia.org/wiki/User:{username}; "
47
    "https://www.inaturalist.org/people/anticompositenumber "
48
    "tools.inaturalistreviewer@tools.wmflabs.org) "
49
    f"Python requests/{requests.__version__}"
50
)
51

52
session = requests.Session()
1✔
53
session.headers.update({"user-agent": user_agent})
1✔
54
recent_bytes = {}
1✔
55
conf_ts = None
1✔
56

57
compare_methods: List[Tuple[str, Callable]] = []
1✔
58
pre_save_hooks: List[Callable] = []
1✔
59
id_hooks: List[Callable] = []
1✔
60
status_hooks: List[Callable] = []
1✔
61
lock_hooks: List[Callable] = []
1✔
62

63

64
class iNaturalistID(NamedTuple):
1✔
65
    id: str
1✔
66
    type: str
1✔
67
    url: str = ""
1✔
68

69
    def __str__(self):
1✔
70
        return f"https://www.inaturalist.org/{self.type}/{self.id}"
1✔
71

72
    def __eq__(self, other):
1✔
73
        if isinstance(other, iNaturalistID):
1!
74
            return self.id == other.id and self.type == other.type
1✔
75
        elif isinstance(other, iNaturalistImage):
×
76
            return self.id == other.id.id and self.type == other.id.type
×
77
        else:
78
            return NotImplemented
×
79

80

81
class RestartBot(RuntimeError):
1✔
82
    pass
1✔
83

84

85
class ProcessingError(Exception):
1✔
86
    def __init__(self, reason_code: str, description: str = ""):
1✔
87
        self.reason_code = reason_code
1✔
88
        self.description = description
1✔
89

90

91
class StopReview(Exception):
1✔
92
    def __init__(self, reason: str):
1✔
93
        self.reason = reason
1✔
94

95

96
def get_config() -> Tuple[dict, datetime.datetime]:
1✔
97
    """Load on-wiki configuration"""
98
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
99
    conf_json = json.loads(page.text)
1✔
100
    logger.info(f"Loaded config from {page.title(as_link=True)}")
1✔
101
    logger.debug(json.dumps(conf_json, indent=2))
1✔
102
    ts = datetime.datetime.utcnow()
1✔
103
    return conf_json, ts
1✔
104

105

106
def check_config() -> None:
1✔
107
    page = pywikibot.Page(site, "User:iNaturalistReviewBot/config.json")
1✔
108
    if conf_ts and page.latest_revision.timestamp > conf_ts:
1!
109
        raise RestartBot("Configuration has been updated, bot will restart")
×
110

111

112
def init_compare_methods() -> None:
1✔
113
    global compare_methods
114
    compare_methods = []
1✔
115
    if "sha1" in config["compare_methods"]:
1✔
116
        compare_methods.append(("sha1", compare_sha1))
1✔
117
    if "phash" in config["compare_methods"]:
1✔
118
        compare_methods.append(("phash", compare_phash))
1✔
119

120

121
def files_to_check(start: Optional[str] = None) -> Iterator[pywikibot.page.BasePage]:
1✔
122
    """Iterate list of files needing review from Commons"""
123
    category = pywikibot.Category(site, "Category:INaturalist review needed")
×
NEW
124
    do_heartbeat()
×
UNCOV
125
    for page in pagegenerators.CategorizedPageGenerator(
×
126
        category, namespaces=6, start=start
127
    ):
128
        yield page
×
129

130

131
def untagged_files_to_check() -> Iterator[pywikibot.page.BasePage]:
1✔
132
    if not config.get("find_untagged"):
×
133
        pages = []
×
134
    else:
135
        try:
×
136
            res = session.get(
×
137
                config["petscan_url"], params=config["untagged_petscan_query"]
138
            )
139
            res.raise_for_status()
×
140

141
            data = res.json()
×
142
            assert data["n"] == "result"
×
143
            pages = data["*"][0]["a"]["*"]
×
144
        except Exception:
×
145
            pages = []
×
146
        logger.info(f'Found {len(data["*"][0]["a"]["*"])} untagged files to check')
×
147

NEW
148
    do_heartbeat()
×
149
    for page_data in pages:
×
150
        yield pywikibot.FilePage(site, title=page_data["title"])
×
151

152

153
def gbif_to_ina_url(url: urllib.parse.ParseResult) -> str:
1✔
154
    path = url.path.split(sep="/")
1✔
155
    if path[1] != "occurrence":
1!
156
        return ""
×
157
    api_url = f"https://api.gbif.org/v1/occurrence/{path[2]}"
1✔
158
    res = session.get(api_url)
1✔
159
    res.raise_for_status()
1✔
160
    return res.json().get("references", "")
1✔
161

162

163
def parse_ina_url(raw_url: str) -> Optional[iNaturalistID]:
1✔
164
    """Parses an iNaturalist URL into an iNaturalistID named tuple"""
165
    url = urllib.parse.urlparse(raw_url)
1✔
166
    path = url.path.split(sep="/")
1✔
167
    netloc = url.netloc.lower()
1✔
168
    if len(path) == 3 and any(
1✔
169
        netloc.endswith(domain) for domain in config["inaturalist_domains"]
170
    ):
171
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
172
    elif len(path) == 4 and netloc in (
1✔
173
        "inaturalist-open-data.s3.amazonaws.com",
174
        "static.inaturalist.org",
175
    ):
176
        return iNaturalistID(type=path[1], id=str(path[2]))
1✔
177
    elif len(path) == 3 and netloc == "www.gbif.org":
1✔
178
        return parse_ina_url(gbif_to_ina_url(url))
1✔
179
    else:
180
        return None
1✔
181

182

183
class Image:
1✔
184
    def __init__(
1✔
185
        self,
186
        raw: Optional[bytes] = None,
187
        image: Optional[PIL.Image.Image] = None,
188
        sha1: str = "",
189
        phash: Optional[imagehash.ImageHash] = None,
190
    ):
191
        self._raw = raw
1✔
192
        self._image = image
1✔
193
        self._sha1 = sha1
1✔
194
        self._phash = phash
1✔
195

196
    @property
1✔
197
    def phash(self) -> imagehash.ImageHash:
1✔
198
        if not self._phash:
1!
199
            self._phash = imagehash.phash(self.image)
1✔
200
        return self._phash
1✔
201

202
    @property
1✔
203
    def image(self):
1✔
204
        raise NotImplementedError
×
205

206
    def __repr__(self) -> str:
1✔
207
        paras = ", ".join(
×
208
            f"{key}={repr(value)}" for key, value in self.__dict__.items()
209
        )
210
        return f"{type(self).__name__}({paras})"
×
211

212
    def __eq__(self, other):
1✔
213
        if isinstance(other, Image):
1!
214
            return self.id == other.id
×
215
        elif isinstance(other, iNaturalistID):
1!
216
            return self.id == other
1✔
217
        else:
218
            return NotImplemented
×
219

220

221
class iNaturalistImage(Image):
1✔
222
    _cache: Dict[iNaturalistID, str] = {}
1✔
223

224
    def __init__(self, id: iNaturalistID, **kwargs):
1✔
225
        self.id = id
1✔
226
        super().__init__(**kwargs)
1✔
227

228
    @property
1✔
229
    def raw(self) -> bytes:
1✔
230
        if not self._raw:
1!
231
            self._raw = acnutils.retry(get_ina_image, 3, photo=self.id)
1✔
232
        return cast(bytes, self._raw)
1✔
233

234
    @property
1✔
235
    def image(self) -> PIL.Image.Image:
1✔
236
        if not self._image:
1!
237
            self._image = PIL.Image.open(BytesIO(self.raw))
1✔
238
        return self._image
1✔
239

240
    @property
1✔
241
    def sha1(self) -> str:
1✔
242
        if not self._sha1:
1✔
243
            if self.id in self._cache:
1✔
244
                self._sha1 = self._cache[self.id]
1✔
245
            else:
246
                sha1sum = hashlib.sha1()
1✔
247
                sha1sum.update(self.raw)
1✔
248
                self._sha1 = sha1sum.hexdigest()
1✔
249
                self._cache[self.id] = self._sha1
1✔
250
        return self._sha1
1✔
251

252

253
class CommonsImage(Image):
1✔
254
    def __init__(self, page: pywikibot.FilePage, **kwargs):
1✔
255
        self.page = page
1✔
256
        if self.page:
1✔
257
            self.page.get(force=True)
1✔
258
        super().__init__(**kwargs)
1✔
259

260
    @property
1✔
261
    def raw(self):
1✔
262
        return NotImplemented
×
263

264
    @property
1✔
265
    def image(self) -> PIL.Image.Image:
1✔
266
        """Download orignal Commons file and open as a PIL image"""
267
        if not self._image:
1!
268
            url = self.page.get_file_url()
1✔
269
            response = session.get(url)
1✔
270
            response.raise_for_status()
1✔
271
            self._image = PIL.Image.open(BytesIO(response.content))
1✔
272
        return self._image
1✔
273

274
    @property
1✔
275
    def sha1(self) -> str:
1✔
276
        if not self._sha1:
1✔
277
            self._sha1 = self.page.latest_file_info.sha1
1✔
278
        return self._sha1
1✔
279

280

281
def compare_sha1(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
282
    logger.debug(f"Commons sha1sum:     {com_img.sha1}")
1✔
283
    logger.debug(f"iNaturalist sha1sum: {ina_img.sha1}")
1✔
284
    return compare_digest(com_img.sha1, ina_img.sha1)
1✔
285

286

287
def compare_phash(com_img: CommonsImage, ina_img: iNaturalistImage) -> bool:
1✔
288
    diff = com_img.phash - ina_img.phash
1✔
289
    logger.debug(f"PHash Hamming distance: {diff}")
1✔
290
    return diff <= config.get("max_phash_dist", 4)
1✔
291

292

293
def get_ina_image(photo: iNaturalistID, final: bool = False) -> bytes:
1✔
294
    """Download original photo from iNaturalist"""
295
    if photo.url:
1!
296
        extension = photo.url.partition("?")[0].rpartition(".")[2]
1✔
297
        domain = photo.url.partition("//")[2].partition("/")[0]
1✔
298
    else:
299
        extension = "jpeg"
×
300
        domain = "inaturalist-open-data.s3.amazonaws.com"
×
301
    # TODO: Replace this hardcoded URL
302
    url = f"https://{domain}/photos/{photo.id}/original.{extension}"
1✔
303
    response = session.get(url)
1✔
304
    if response.status_code == 403 and not final:
1!
305
        return get_ina_image(photo._replace(url=url.replace("jpeg", "jpg")), final=True)
×
306
    response.raise_for_status()
1✔
307
    return response.content
1✔
308

309

310
def bytes_throttle(length: int) -> None:
1✔
311
    hour_limit = 4.5e9
×
312
    day_limit = 23.5e9
×
313
    global recent_bytes
314
    logger.debug(f"Content length: {length}")
×
315
    now = datetime.datetime.now()
×
316
    recent_bytes[datetime.datetime.now()] = length
×
317

318
    last_hour = 0
×
319
    last_day = 0
×
320
    for date, val in recent_bytes.copy().items():
×
321
        if now - date <= datetime.timedelta(hours=24):
×
322
            last_day += val
×
323
            if now - date <= datetime.timedelta(hours=1):
×
324
                last_hour += val
×
325
        else:
326
            del recent_bytes[date]
×
327

328
    logger.debug(f"Hour total: {last_hour}, day total: {last_day}")
×
329
    if last_day >= day_limit:
×
330
        logger.error(
×
331
            f"{last_day} bytes transferred in last 24h, approaching iNaturalist limits!"
332
        )
333
        sleep_time = 3600 * 12  # 12 hours
×
334
    elif last_hour >= hour_limit:
×
335
        logger.error(
×
336
            f"{last_hour} bytes transferred in last hour, "
337
            "approaching iNaturalist limits!"
338
        )
339
        sleep_time = 60 * 30  # 30 minutes
×
340
    else:
341
        return None
×
342
    logger.info(f"Sleeping for {sleep_time} seconds")
×
343
    time.sleep(sleep_time)
×
344
    return None
×
345

346

347
def do_heartbeat() -> None:
1✔
348
    """Update the timestamp on a file (if provided)
349

350
    Works with inrbot-healthcheck.sh when the HEARTBEAT_FILE environment variable is set
351
    """
352
    if os.environ.get("HEARTBEAT_FILE"):
1!
NEW
353
        Path(os.environ["HEARTBEAT_FILE"]).touch()
×
354

355

356
class Aliases:
1✔
357
    alias_cache: Dict[str, Dict[str, Union[float, Set[str]]]] = {}
1✔
358

359
    def __init__(self, title: str) -> None:
1✔
360
        self.title: str = title
1✔
361
        self._aliases: Optional[Set[str]] = None
1✔
362

363
    def get_aliases(self) -> None:
1✔
364
        canon_page = pywikibot.Page(site, f"Template:{self.title}")
1✔
365
        aliases = {
1✔
366
            page.title(with_ns=False).lower()
367
            for page in canon_page.backlinks(filter_redirects=True, namespaces=10)
368
        }
369
        aliases.add(canon_page.title(with_ns=False).lower())
1✔
370
        aliases.update(
1✔
371
            page.title(with_ns=False).lower().partition("/")[0]
372
            for page in canon_page.embeddedin(namespaces=10)
373
        )
374
        self._aliases = aliases
1✔
375

376
    @property
1✔
377
    def aliases(self):
1✔
378
        if self._aliases is None:
1✔
379
            cached = self.alias_cache.get(self.title)
1✔
380
            if cached is None or time.monotonic() - cached["last_update"] > 3600:
1✔
381
                self.get_aliases()
1✔
382
                self.alias_cache[self.title] = {
1✔
383
                    "last_update": time.monotonic(),
384
                    "aliases": self._aliases,
385
                }
386
            else:
387
                self._aliases = cached["aliases"]
1✔
388
        return self._aliases
1✔
389

390
    def is_license(self, template: mwph.nodes.Template) -> bool:
1✔
391
        if template.name.lower() in self.aliases:
1✔
392
            return True
1✔
393
        elif template.name.lower() == "self":
1!
394
            return True
×
395
        return False
1✔
396

397

398
def get_observation_from_photo(photo_id: iNaturalistID) -> iNaturalistID:
1✔
399
    assert photo_id.type == "photos"
1✔
400
    try:
1✔
401
        res = session.get(str(photo_id))
1✔
402
        res.raise_for_status()
1✔
403
    except Exception:
×
404
        raise ProcessingError("nourl", "No observation ID could be found")
×
405
    # Yes, I know I'm parsing HTML with a regex.
406
    match = re.search(r"/observations/(\d*)\"", res.text)
1✔
407
    if not match:
1!
408
        raise ProcessingError("nourl", "No observation ID could be found")
×
409
    else:
410
        return iNaturalistID(type="observations", id=match.group(1))
1✔
411

412

413
class CommonsPage:
1✔
414
    def __init__(
1✔
415
        self,
416
        page: pywikibot.FilePage,
417
        throttle: Optional[acnutils.Throttle] = None,
418
        ina_throttle: acnutils.Throttle = acnutils.Throttle(10),
419
    ) -> None:
420
        self.page = page
1✔
421
        self._com_license: Optional[str] = None
1✔
422
        self._ina_license: Optional[str] = None
1✔
423
        self._status = ""
1✔
424
        self._ina_author: Optional[str] = None
1✔
425
        self._ina_data: dict = {}
1✔
426
        self._is_old: Optional[bool] = None
1✔
427
        self._no_del: Optional[bool] = None
1✔
428
        self._archive = ""
1✔
429
        self.throttle = throttle
1✔
430
        self.ina_throttle = ina_throttle
1✔
431
        self.reason = ""
1✔
432
        self._photo_id: Optional[iNaturalistID] = None
1✔
433
        self._raw_photo_id: Optional[iNaturalistID] = None
1✔
434
        self._obs_id: Optional[iNaturalistID] = None
1✔
435
        self._locked = False
1✔
436
        self.photo_id_source = ""
1✔
437

438
    @property
1✔
439
    def locked(self) -> bool:
1✔
440
        return self._locked
1✔
441

442
    @locked.setter
1✔
443
    def locked(self, value: bool):
1✔
444
        if self._locked is False:
1!
445
            self._locked = value
1✔
446
        elif value is False:
×
447
            raise TypeError("Can not unlock parameters")
×
448

449
    def lock(self):
1✔
450
        if self.locked is False:
1✔
451
            for hook in lock_hooks:
1!
452
                hook(self)
×
453
            self.locked = True
1✔
454

455
    def _set_locking(self, attr: str, value: Any) -> None:
1✔
456
        if not self.locked:
1✔
457
            setattr(self, attr, value)
1✔
458
        else:
459
            raise TypeError(f"{attr[1:]} has already been read, and can not be changed")
1✔
460

461
    def _get_locking_str(self, attr: str, setter: Optional[Callable] = None) -> str:
1✔
462
        if getattr(self, attr) is None:
1✔
463
            if self.locked:
1!
464
                setattr(self, attr, "")
1✔
465
            elif setter is not None:
×
466
                setter()
×
467
            else:
468
                raise AttributeError(attr[1:])
×
469
        return getattr(self, attr)
1✔
470

471
    def check_can_run(self) -> bool:
1✔
472
        """Determinies if the bot should run on this page and returns a bool."""
473
        page = self.page
1✔
474
        if (
1✔
475
            (page.title() in skip)
476
            or (not page.has_permission("edit"))
477
            or (not page.botMayEdit())
478
            or (re.search(r"{{[iI][nN]aturalist[rR]eview *?\|.*?}}", page.text))
479
        ):
480
            return False
1✔
481
        else:
482
            return True
1✔
483

484
    def check_has_template(self) -> bool:
1✔
485
        return bool(re.search(r"{{[iI][nN]aturalist[rR]eview", self.page.text))
1✔
486

487
    def check_stop_cats(self) -> None:
1✔
488
        stop_cats = {
1✔
489
            pywikibot.Category(site, title) for title in config["stop_categories"]
490
        }
491
        page_cats = set(self.page.categories())
1✔
492
        page_stop = stop_cats & page_cats
1✔
493
        if page_stop:
1✔
494
            raise StopReview(str(page_stop))
1✔
495

496
    def find_ina_id(self) -> None:
1✔
497
        """Returns an iNaturalistID tuple from wikitext"""
498
        photos = []
1✔
499
        observations = []
1✔
500

501
        for url in self.page.extlinks():
1✔
502
            url_id = parse_ina_url(url)
1✔
503
            if (
1✔
504
                url_id is None
505
                or re.search(r"[A-z]", url_id.id)
506
                or url_id in photos
507
                or url_id in observations
508
            ):
509
                continue  # pragma: no cover
510
            elif url_id.type == "observations":
1✔
511
                observations.append(url_id)
1✔
512
            elif url_id.type == "photos":
1!
513
                photos.append(url_id)
1✔
514

515
        for hook in id_hooks:
1✔
516
            hook_id = hook(self, observations=observations.copy(), photos=photos.copy())
1✔
517
            if hook_id is None or re.search(r"[A-z]", hook_id.id):
1✔
518
                continue  # pragma: no cover
519
            elif hook_id.type == "observations":
1✔
520
                observations.insert(0, hook_id)
1✔
521
            elif hook_id.type == "photos":
1!
522
                photos.insert(0, hook_id)
1✔
523
                observations = []
1✔
524

525
        if photos and observations:
1✔
526
            self.obs_id = observations[0]
1✔
527
            self.raw_photo_id = photos[0]
1✔
528
        elif observations:
1✔
529
            self.obs_id = observations[0]
1✔
530
            self.raw_photo_id = None
1✔
531
        elif photos:
1✔
532
            self.obs_id = None
1✔
533
            self.raw_photo_id = photos[0]
1✔
534
        else:
535
            raise ProcessingError("nourl", "No observation ID could be found")
1✔
536

537
    @property
1✔
538
    def photo_id(self) -> Optional[iNaturalistID]:
1✔
539
        return self._photo_id
1✔
540

541
    @photo_id.setter
1✔
542
    def photo_id(self, value: iNaturalistID):
1✔
543
        self._set_locking("_photo_id", value)
1✔
544

545
    @property
1✔
546
    def raw_photo_id(self) -> Optional[iNaturalistID]:
1✔
547
        return self._raw_photo_id
1✔
548

549
    @raw_photo_id.setter
1✔
550
    def raw_photo_id(self, value: iNaturalistID):
1✔
551
        self._raw_photo_id = value
1✔
552

553
    @property
1✔
554
    def obs_id(self) -> Optional[iNaturalistID]:
1✔
555
        if not self._obs_id and not self.locked:
1✔
556
            if self.raw_photo_id:
1✔
557
                self._obs_id = get_observation_from_photo(self.raw_photo_id)
1✔
558
        return self._obs_id
1✔
559

560
    @obs_id.setter
1✔
561
    def obs_id(self, value: iNaturalistID) -> None:
1✔
562
        self._set_locking("_obs_id", value)
1✔
563

564
    @obs_id.deleter
1✔
565
    def obs_id(self) -> None:
1✔
566
        if not self.locked:
1!
567
            self._obs_id = None
1✔
568
            del self.ina_data
1✔
569
        else:
570
            raise TypeError
×
571

572
    @property
1✔
573
    def ina_data(self) -> dict:
1✔
574
        """Make API request to iNaturalist from an ID and ID type
575

576
        Returns a dict of the API result
577
        """
578
        if not self._ina_data:
1✔
579
            assert self.obs_id
1✔
580
            if self.obs_id.type == "observations":
1✔
581
                url = f"https://api.inaturalist.org/v1/observations/{self.obs_id.id}"
1✔
582
            else:
583
                raise ProcessingError("apierr", "iNaturalist ID is wrong type")
1✔
584

585
            if self.throttle:
1!
586
                self.throttle.throttle()
×
587
            try:
1✔
588
                response = session.get(url, headers={"Accept": "application/json"})
1✔
589
                response.raise_for_status()
1✔
590
                response_json = response.json()
1✔
591
            except (ValueError, requests.exceptions.HTTPError) as err:
1✔
592
                raise ProcessingError("apierr", "iNaturalist API error") from err
1✔
593
            else:
594
                if response_json.get("total_results") != 1:
1✔
595
                    logger.debug(response_json)
1✔
596
                    raise ProcessingError("apierr", f"iNaturalist API error in {url}")
1✔
597
                res = response_json.get("results", [None])[0]
1✔
598
                if not res:
1✔
599
                    raise ProcessingError(
1✔
600
                        "apierr", f"No data recieved from iNaturalist in {url}"
601
                    )
602
                self._ina_data = res
1✔
603
        return self._ina_data
1✔
604

605
    @ina_data.deleter
1✔
606
    def ina_data(self) -> None:
1✔
607
        self._ina_data = {}
1✔
608

609
    def get_ina_license(self) -> None:
1✔
610
        """Find the image license in the iNaturalist API response
611

612
        If a license is found, the Commons template name is returned.
613
        If no license is found, an empty string is returned.
614

615
        The API does not return CC version numbers, but the website has 4.0 links.
616
        CC 4.0 licenses are assumed.
617
        """
618
        assert self.photo_id
1✔
619
        licenses = config["ina_licenses"]
1✔
620
        photos: list = self.ina_data.get("photos", [])
1✔
621
        for photo_data in photos:
1✔
622
            if str(photo_data.get("id")) == self.photo_id.id:
1✔
623
                license_code = photo_data.get("license_code", "null")
1✔
624
                break
1✔
625
        else:
626
            raise ProcessingError("inatlicense", "No iNaturalist license found")
1✔
627

628
        if not license_code:
1!
629
            license_code = "null"
×
630

631
        try:
1✔
632
            self.ina_license = licenses[license_code]
1✔
633
        except KeyError as e:
×
634
            raise ProcessingError("inatlicense", "No iNaturalist license found") from e
×
635
        logger.info(f"iNaturalist License: {self.ina_license}")
1✔
636

637
    @property
1✔
638
    def ina_license(self) -> str:
1✔
639
        return self._get_locking_str("_ina_license", self.get_ina_license)
1✔
640

641
    @ina_license.setter
1✔
642
    def ina_license(self, value: str) -> None:
1✔
643
        self._set_locking("_ina_license", value)
1✔
644

645
    def find_photo_in_obs(self, recurse: bool = True) -> None:
1✔
646
        """Find the matching image in an iNaturalist observation
647

648
        Returns an iNaturalistID named tuple with the photo ID.
649
        """
650
        images = [
1✔
651
            iNaturalistImage(
652
                id=iNaturalistID(type="photos", id=str(photo["id"]), url=photo["url"])
653
            )
654
            for photo in self.ina_data["photos"]
655
        ]
656
        if len(images) < 1:
1✔
657
            raise ProcessingError("notfound", "No photos in observation")
1✔
658
        elif self.raw_photo_id:
1✔
659
            # False sorts before True, otherwise remains in original order
660
            # This will sort the matching photo before other photos in the obs,
661
            # but will still check those other images if no match.
662
            images.sort(key=lambda image: self.raw_photo_id != image)
1✔
663

664
        commons_image = CommonsImage(page=self.page)
1✔
665

666
        for comp_method, comp_func in compare_methods:
1✔
667
            logger.info(f"Comparing photos using {comp_method}")
1✔
668
            for image in images:
1✔
669
                logger.debug(f"Comparing {str(image.id)}")
1✔
670
                try:
1✔
671
                    res = comp_func(com_img=commons_image, ina_img=image)
1✔
672
                except Exception:
×
673
                    res = False
×
674
                if res:
1✔
675
                    logger.info(f"Match found: {str(image.id)}")
1✔
676
                    self.reason = comp_method
1✔
677
                    self.photo_id = image.id
1✔
678
                    return
1✔
679
                elif self.throttle:
1!
680
                    self.throttle.throttle()
×
681
        if self.raw_photo_id and self.raw_photo_id not in images and recurse:
1✔
682
            del self.obs_id
1✔
683
            self.find_photo_in_obs(recurse=False)
1✔
684
        else:
685
            raise ProcessingError("notmatching", "No matching photos found")
1✔
686

687
    def get_ina_author(self):
1✔
688
        self.ina_author = self.ina_data.get("user", {}).get("login", "")
1✔
689
        logger.info(f"Author: {self.ina_author}")
1✔
690

691
    @property
1✔
692
    def ina_author(self) -> str:
1✔
693
        """Find the image author in the iNaturalist API response
694

695
        Returns a string with the username of the iNaturalist contributor
696
        """
697
        return self._get_locking_str("_ina_author", self.get_ina_author)
1✔
698

699
    @ina_author.setter
1✔
700
    def ina_author(self, value: str) -> None:
1✔
701
        self._set_locking("_ina_author", value)
1✔
702

703
    def get_com_license(self):
1✔
704
        """Find the license template currently used on the Commons page
705

706
        Returns the first license template used on the page. If no templates
707
        are found, return an empty string.
708
        """
709

710
        category = pywikibot.Category(site, "Category:Primary license tags (flat list)")
1✔
711
        templates = set(self.page.itertemplates())
1✔
712
        license_tags = set(category.members(namespaces=10))
1✔
713

714
        for template in templates:
1✔
715
            if template in license_tags:
1✔
716
                self._com_license = template.title(with_ns=False)
1✔
717
                break
1✔
718
        else:
719
            logger.info("No Commons license found!")
1✔
720
            self._com_license = ""
1✔
721
        logger.info(f"Commons License: {self.com_license}")
1✔
722

723
    @property
1✔
724
    def com_license(self) -> str:
1✔
725
        return self._get_locking_str("_com_license", self.get_com_license)
1✔
726

727
    @com_license.setter
1✔
728
    def com_license(self, value: str) -> None:
1✔
729
        self._set_locking("_com_license", value)
1✔
730

731
    def compare_licenses(self) -> None:
1✔
732
        free_licenses = set(config["free_licenses"])
1✔
733

734
        if not self.ina_license:
1✔
735
            # iNaturalist license wasn't found, call in the humans
736
            self.status = "error"
1✔
737
        elif self.ina_license not in free_licenses:
1✔
738
            # Source license is non-free, failed license review
739
            self.status = "fail"
1✔
740
        elif self.ina_license == self.com_license:
1✔
741
            # Licenses are the same, license review passes
742
            self.status = "pass"
1✔
743
        else:
744
            # Commons license doesn't match iNaturalist, update to match
745
            self.status = "pass-change"
1✔
746

747
    @property
1✔
748
    def status(self) -> str:
1✔
749
        """Checks the Commons license against the iNaturalist license
750

751
        Returns a string with the status
752
        Statuses:
753
            fail:       iNaturalist license is non-free
754
            error:      Bot could not determine
755
            pass:       Licenses match
756
            pass-change: Commons license changed to free iNaturalist license
757
        """
758
        if not self.locked:
1✔
759
            if not self._status:
1✔
760
                self.compare_licenses()
1✔
761
            for hook in status_hooks:
1✔
762
                hook(self)
1✔
763
        return self._status
1✔
764

765
    @status.setter
1✔
766
    def status(self, value):
1✔
767
        self._set_locking("_status", value)
1✔
768

769
    @status.deleter
1✔
770
    def status(self):
1✔
771
        self.status = ""
1✔
772

773
    def _file_is_old(self) -> bool:
1✔
774
        if not config.get("old_fail", False):
1✔
775
            return False
1✔
776

777
        timestamp = self.page.latest_file_info.timestamp
1✔
778
        if (datetime.datetime.now() - timestamp) > datetime.timedelta(
1✔
779
            days=config["old_fail_age"]
780
        ):
781
            return True
1✔
782
        else:
783
            return False
1✔
784

785
    @property
1✔
786
    def is_old(self) -> bool:
1✔
787
        if self._is_old is None:
1✔
788
            if self.status == "fail":
1✔
789
                self._is_old = self._file_is_old()
1✔
790
            else:
791
                self._is_old = False
1✔
792
        return self._is_old
1✔
793

794
    @is_old.setter
1✔
795
    def is_old(self, value: bool) -> None:
1✔
796
        self._set_locking("_is_old", value)
1✔
797

798
    @property
1✔
799
    def no_del(self) -> bool:
1✔
800
        if self._no_del is None:
1✔
801
            if self.status == "fail":
1✔
802
                page_templates = set(self.page.itertemplates())
1✔
803
                check_templates = {
1✔
804
                    pywikibot.Page(site, "Template:OTRS received"),
805
                    pywikibot.Page(site, "Template:Deletion template tag"),
806
                }
807
                self._no_del = not page_templates.isdisjoint(check_templates)
1✔
808
            else:
809
                self._no_del = False
1✔
810
        return self._no_del
1✔
811

812
    @no_del.setter
1✔
813
    def no_del(self, value) -> None:
1✔
814
        self._set_locking("_no_del", value)
1✔
815

816
    @property
1✔
817
    def archive(self) -> str:
1✔
818
        if not self._archive:
1✔
819
            if config.get("use_wayback") and self.status in ("pass", "pass-change"):
1!
820
                self.get_old_archive()
×
821
                if not self._archive:
×
822
                    self.save_archive()
×
823
            elif self.status == "fail" or (
1✔
824
                self.status != "error" and config.get("wayback_get", True)
825
            ):
826
                self.get_old_archive()
1✔
827
        return self._archive
1✔
828

829
    @archive.setter
1✔
830
    def archive(self, value: str) -> None:
1✔
831
        self._archive = value
1✔
832

833
    def save_archive(self) -> None:
1✔
834
        try:
×
835
            url = waybackpy.Url(str(self.photo_id), user_agent).save()
×
836
            assert url.archive_url is not None
×
837
            self.archive = url.archive_url
×
838
        except Exception as err:
×
839
            logger.warn("Failed to get archive", exc_info=err)
×
840
            self.archive = ""
×
841

842
    def get_old_archive(self) -> None:
1✔
843
        try:
×
844
            url = waybackpy.Url(str(self.photo_id), user_agent).oldest()
×
845
            assert url.archive_url is not None
×
846
            self.archive = url.archive_url
×
847
        except Exception as err:
×
848
            logger.info("Failed to get archive", exc_info=err)
×
849
            self.archive = ""
×
850
        else:
851
            if self.status == "fail":
×
852
                self.status = "fail-archive"
×
853

854
    def uploader_talk(self) -> pywikibot.page.Page:
1✔
855
        return pywikibot.Page(site, f"User talk:{self.page.oldest_file_info.user}")
1✔
856

857
    def update_review(self) -> bool:
1✔
858
        """Updates the wikitext with the review status"""
859
        logger.info(f"Status: {self.status} ({self.reason})")
1✔
860
        self.lock()
1✔
861
        code = mwph.parse(self.page.text)
1✔
862
        template = self.make_template()
1✔
863
        changed = False
1✔
864
        if self.check_has_template():
1✔
865
            # Already tagged for review, replace the existing template
866
            for review_template in code.ifilter_templates(
1✔
867
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
868
            ):
869
                code.replace(review_template, template)
1✔
870
                changed = True
1✔
871
        else:
872
            # Check for normal {{LicenseReview}} template
873
            for review_template in code.ifilter_templates(
1!
874
                matches=lambda t: re.search(r"[Ll]icense ?[Rr]eview", str(t))
875
            ):
876
                code.replace(review_template, template)
×
877
                changed = True
×
878

879
            if not changed:
1!
880
                # Not already tagged, try to put the review template under the license
881
                if self.com_license:
1✔
882
                    aliases = Aliases(self.com_license)
1✔
883
                    for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
884
                        code.insert_after(pt2, "\n" + template)
1✔
885
                        changed = True
1✔
886
                else:
887
                    for node in code.ifilter(
1!
888
                        matches=lambda n: re.search(
889
                            r"(\[\[Category:|\{\{Uncategorized)", str(n)
890
                        )
891
                    ):
892
                        code.insert_before(node, template + "\n\n")
1✔
893
                        changed = True
1✔
894
                        break
1✔
895
                    else:
896
                        code.append("\n\n" + template)
×
897
                        changed = True
×
898

899
        if not changed:
1✔
900
            logger.info("Page not changed")
1✔
901
            return False
1✔
902

903
        if self.status == "pass-change":
1✔
904
            if self.com_license:
1✔
905
                aliases = Aliases(self.com_license)
1✔
906
                for pt2 in code.ifilter_templates(matches=aliases.is_license):
1✔
907
                    code.replace(pt2, ("{{%s}}" % self.ina_license))
1✔
908
            else:
909
                code.insert_before(template, ("{{%s}}" % self.ina_license))
1✔
910

911
        if self.status == "fail" and not self.no_del:
1✔
912
            code.insert(
1✔
913
                0,
914
                string.Template(
915
                    config["old_fail_tag"] if self.is_old else config["fail_tag"]
916
                ).safe_substitute(
917
                    review_license=self.ina_license,
918
                    source_url=str(self.photo_id) if self.photo_id else "",
919
                ),
920
            )
921

922
        if self.status in ["pass", "pass-change"] and config.get("tag_source"):
1✔
923
            self.add_source_tag(code)
1✔
924

925
        if self.throttle is not None:
1!
926
            self.throttle.throttle()
×
927
        try:
1✔
928
            self.save_page(str(code))
1✔
929
        except Exception as err:
×
930
            logging.exception(err)
×
931
            return False
×
932
        else:
933
            return True
1✔
934

935
    def make_template(self) -> str:
1✔
936
        """Constructs the iNaturalistreview template"""
937
        self.lock()
1✔
938
        if self.status == "stop":
1✔
939
            return ""
1✔
940
        template = string.Template(config[self.status])
1✔
941
        text = template.safe_substitute(
1✔
942
            status=self.status,
943
            author=self.ina_author,
944
            source_url=str(self.photo_id) if self.photo_id else "",
945
            review_date=datetime.date.today().isoformat(),
946
            reviewer=username,
947
            review_license=self.ina_license,
948
            upload_license=self.com_license,
949
            reason=self.reason,
950
            archive=self.archive,
951
        )
952
        return text
1✔
953

954
    def add_source_tag(self, code: mwph.wikicode.Wikicode) -> None:
1✔
955
        source_tag = ""
1✔
956
        templates = set(self.page.itertemplates())
1✔
957
        if not self.obs_id or not config["tag_source"]:
1✔
958
            return
1✔
959
        if pywikibot.Page(site, "Template:INaturalist") not in templates:
1!
960
            source_tag += "\n{{iNaturalist|%s}}" % self.obs_id.id
1✔
961

962
        gbif_links = [
1✔
963
            link
964
            for link in self.ina_data.get("outlinks", [])
965
            if link["source"] == "GBIF"
966
        ]
967
        if gbif_links and pywikibot.Page(site, "Template:Gbif") not in templates:
1!
968
            gbif_id = gbif_links[0]["url"].split("/")[-1]
1✔
969
            source_tag += "\n{{gbif|%s}}" % gbif_id
1✔
970

971
        if not source_tag:
1!
972
            return
×
973

974
        try:
1✔
975
            # Place templates at the bottom of =={{int:filedesc}}==,
976
            # after any other templates but before categories/other text
977
            prev = code.get_sections(matches="filedesc")[0].filter_templates(
1✔
978
                recursive=False
979
            )[-1]
980
        except IndexError:
1✔
981
            # If there is no Summary section, just place after {{iNaturalistreview}}
982
            prev = code.filter_templates(
1✔
983
                matches=lambda t: t.name.strip().lower() == "inaturalistreview"
984
            )[0]
985

986
        code.insert_after(prev, source_tag)
1✔
987

988
    def save_page(self, new_text: str) -> None:
1✔
989
        """Replaces the wikitext of the specified page with new_text
990

991
        If the global simulate variable is true, the wikitext will be printed
992
        instead of saved to Commons.
993
        """
994

995
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
996
            status=self.status,
997
            review_license=self.ina_license,
998
            version=__version__,
999
            tag=summary_tag,
1000
        )
1001
        for hook in pre_save_hooks:
1!
1002
            hook(
×
1003
                self,
1004
                new_text=new_text,
1005
                summary=summary,
1006
            )
1007
        if not simulate:
1✔
1008
            acnutils.check_runpage(site, override=run_override)
1✔
1009
            logger.info(f"Saving {self.page.title()}")
1✔
1010
            acnutils.retry(
1✔
1011
                acnutils.save_page,
1012
                3,
1013
                text=new_text,
1014
                page=self.page,
1015
                summary=summary,
1016
                bot=False,
1017
                minor=False,
1018
            )
1019
        else:
1020
            logger.info("Saving disabled")
1✔
1021
            logger.debug(summary)
1✔
1022
            logger.debug(new_text)
1✔
1023

1024
    def fail_warning(self) -> None:
1✔
1025
        user_talk = self.uploader_talk()
1✔
1026
        message = string.Template(
1✔
1027
            config["old_fail_warn"] if self.is_old else config["fail_warn"]
1028
        ).safe_substitute(
1029
            filename=self.page.title(with_ns=True),
1030
            review_license=self.ina_license,
1031
            source_url=str(self.photo_id) if self.photo_id else "",
1032
        )
1033
        summary = string.Template(config["review_summary"]).safe_substitute(
1✔
1034
            status="fail",
1035
            review_license=self.ina_license,
1036
            version=__version__,
1037
            tag=summary_tag,
1038
        )
1039
        if not simulate:
1!
1040
            acnutils.check_runpage(site, override=run_override)
1✔
1041
            logger.info(f"Saving {user_talk.title()}")
1✔
1042
            acnutils.retry(
1✔
1043
                acnutils.save_page,
1044
                3,
1045
                text=message,
1046
                page=user_talk,
1047
                summary=summary,
1048
                bot=False,
1049
                minor=False,
1050
                mode="append",
1051
            )
1052
        else:
1053
            logger.info("Saving disabled")
×
1054
            logger.info(summary)
×
1055
            logger.info(message)
×
1056

1057
    def log_untagged_error(self) -> None:
1✔
1058
        if simulate:
×
1059
            return
×
1060
        log_page = pywikibot.Page(site, config["untagged_log_page"])
×
1061
        if self.page.title() not in log_page.text:
×
1062
            message = string.Template(config["untagged_log_line"]).safe_substitute(
×
1063
                status=self.status,
1064
                reason=self.reason,
1065
                link=self.page.title(as_link=True, textlink=True),
1066
            )
1067
            summary = string.Template(config["untagged_log_summary"]).safe_substitute(
×
1068
                status=self.status,
1069
                reason=self.reason,
1070
                link=self.page.title(as_link=True, textlink=True),
1071
                version=__version__,
1072
                tag=summary_tag,
1073
            )
1074
            acnutils.check_runpage(site, override=run_override)
×
1075
            acnutils.retry(
×
1076
                acnutils.save_page,
1077
                3,
1078
                text=message,
1079
                page=log_page,
1080
                summary=summary,
1081
                bot=False,
1082
                minor=False,
1083
                mode="append",
1084
            )
1085

1086
    def remove_untagged_log(self) -> None:
1✔
1087
        """
1088
        Removes a file from the untagged error log
1089
        """
1090
        log_page = pywikibot.Page(site, config["untagged_log_page"])
1✔
1091
        new_text, changes = re.subn(
1✔
1092
            r"^.*?{0}.*\n?".format(re.escape(str(self.page.title()))),
1093
            "",
1094
            log_page.text,
1095
            flags=re.MULTILINE,
1096
        )
1097
        summary = string.Template(
1✔
1098
            config["untagged_remove_log_summary"]
1099
        ).safe_substitute(
1100
            link=self.page.title(as_link=True, textlink=True),
1101
            version=__version__,
1102
            tag=summary_tag,
1103
        )
1104

1105
        if changes == 0:
1!
1106
            return
1✔
1107
        if simulate:
×
1108
            logger.debug(summary)
×
1109
            logger.debug(new_text)
×
1110
        else:
1111
            acnutils.retry(
×
1112
                acnutils.save_page,
1113
                3,
1114
                text=new_text,
1115
                page=log_page,
1116
                summary=summary,
1117
                bot=False,
1118
                minor=False,
1119
            )
1120

1121
    def review_file(
1✔
1122
        self, throttle: Optional[acnutils.Throttle] = None
1123
    ) -> Optional[bool]:
1124
        """Performs a license review on the input page
1125

1126
        inpage must be in the file namespace.
1127

1128
        Returns None if the file was skipped
1129
        Returns False if there was an error during review
1130
        Returns True if the file was successfully reviewed (pass or fail)
1131
        """
1132
        logger.info(f"Checking {self.page.title(as_link=True)}")
1✔
1133

1134
        acnutils.check_runpage(site, override=run_override)
1✔
1135
        if not self.check_can_run():
1✔
1136
            return None
1✔
1137

1138
        #####
1139
        try:
1✔
1140
            self.check_stop_cats()
1✔
1141
            # Get iNaturalistID
1142
            self.find_ina_id()
1✔
1143
            logger.info(f"ID found in wikitext: {self.obs_id} {self.raw_photo_id}")
1✔
1144

1145
            try:
1✔
1146
                self.find_photo_in_obs()
1✔
1147
            except ProcessingError as err:
×
1148
                if (
×
1149
                    err.reason_code in ("apierr", "notfound")
1150
                    and self.raw_photo_id
1151
                    and self.obs_id
1152
                ):
1153
                    # Observation ID probably doesn't exist.
1154
                    # If we've got a photo ID, try that.
1155
                    del self.obs_id
×
1156
                    self.find_photo_in_obs()
×
1157
                else:
1158
                    raise
×
1159
            self.compare_licenses()
1✔
1160
            self.get_ina_author()
1✔
1161
            self.archive
1✔
1162

1163
        except ProcessingError as err:
1✔
1164
            logger.info("Processing failed:", exc_info=err)
1✔
1165
            self.status = "error"
1✔
1166
            self.reason = err.reason_code
1✔
1167
        except StopReview as err:
1✔
1168
            logger.info(f"Image already reviewed, contains {err.reason}")
1✔
1169
            self.status = "stop"
1✔
1170
        except (acnutils.RunpageError, KeyboardInterrupt, ConnectionError) as err:
1✔
1171
            raise err
1✔
1172
        except Exception as err:
1✔
1173
            logger.exception(err)
1✔
1174
            self.status = "error"
1✔
1175
            self.reason = repr(err)
1✔
1176

1177
        if self.status == "error" and not self.check_has_template():
1✔
1178
            # Not previously tagged, don't need to throw an error message on it.
1179
            logger.info("Skipping...")
1✔
1180
            skip.add(self.page.title())
1✔
1181
            self.log_untagged_error()
1✔
1182
            # TODO: report out failures/maintain skip list
1183

1184
            return False
1✔
1185
        reviewed = self.update_review()
1✔
1186
        if self.status == "fail" and reviewed and not self.no_del:
1✔
1187
            self.fail_warning()
1✔
1188

1189
        if reviewed:
1✔
1190
            self.remove_untagged_log()
1✔
1191

1192
        return reviewed
1✔
1193

1194

1195
def main(
1✔
1196
    page: Optional[pywikibot.page.BasePage] = None,
1197
    total: int = 0,
1198
    start: Optional[str] = None,
1199
) -> None:
1200
    """Main loop for program"""
1201
    # Enumerate starts at 0, so to get N items, count to N-1.
1202
    if page:
1✔
1203
        # When given a page, check only that page
1204
        cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1205
        cpage.review_file()
1✔
1206
    else:
1207
        # Otherwise, run automatically
1208
        # If total is 0, run continuously.
1209
        # If total is non-zero, check that many files
1210
        logger.info("Beginning loop")
1✔
1211
        i = 0
1✔
1212
        running = True
1✔
1213
        throttle = acnutils.Throttle(config.get("edit_throttle", 60))
1✔
1214
        while (not total) or (i < total):
1✔
1215
            for page in itertools.chain(
1✔
1216
                files_to_check(start), untagged_files_to_check()
1217
            ):
1218
                do_heartbeat()
1✔
1219
                try:
1✔
1220
                    cpage = CommonsPage(pywikibot.FilePage(page))
1✔
1221
                except ValueError:
×
1222
                    continue
×
1223

1224
                if total and i >= total:
1✔
1225
                    break
1✔
1226
                i += 1
1✔
1227

1228
                try:
1✔
1229
                    check_config()
1✔
1230
                    cpage.review_file()
1✔
1231
                except (acnutils.RunpageError, RestartBot, ConnectionError) as err:
1✔
1232
                    # Blocks and runpage checks always stop
1233
                    logger.exception(err)
1✔
1234
                    raise
1✔
1235
                except Exception as err:
1✔
1236
                    if running:
1✔
1237
                        logger.exception(err)
1✔
1238
                        running = False
1✔
1239
                    else:
1240
                        # If this exception happened after running out
1241
                        # of pages or another exception, stop the bot.
1242
                        logger.exception(err)
1✔
1243
                        raise
1✔
1244
                else:
1245
                    running = True
1✔
1246
                throttle.throttle()
1✔
1247
            else:
1248
                # If the for loop drops out, there are no more pages right now
1249
                if running:
1!
1250
                    running = False
1✔
1251
                    logger.warning("Out of pages to check!")
1✔
1252
                # May need to adjust this number depending on load
1253
                else:
1254
                    time.sleep(60)
×
1255

1256

1257
config, conf_ts = get_config()
1✔
1258
init_compare_methods()
1✔
1259
if __name__ == "__main__":
1!
1260
    parser = argparse.ArgumentParser(
×
1261
        description="Review files from iNaturalist on Commons",
1262
        prog="iNaturalistReviewer",
1263
    )
1264
    run_method = parser.add_mutually_exclusive_group(required=True)
×
1265
    run_method.add_argument(
×
1266
        "--auto", action="store_true", help="run the bot automatically"
1267
    )
1268
    run_method.add_argument(
×
1269
        "--file", action="store", help="run the bot only on the specified file"
1270
    )
1271
    parser.add_argument(
×
1272
        "--total",
1273
        action="store",
1274
        type=int,
1275
        help="review no more than this number of files in automatic mode",
1276
        default=0,
1277
    )
1278
    parser.add_argument(
×
1279
        "--ignore-runpage",
1280
        action="store_true",
1281
        dest="ignore_runpage",
1282
        help="skip the runpage check for testing",
1283
    )
1284
    parser.add_argument(
×
1285
        "--start",
1286
        action="store",
1287
        help="sortkey to start iterating at",
1288
        default=None,
1289
    )
1290
    sim = parser.add_mutually_exclusive_group()
×
1291
    sim.add_argument(
×
1292
        "--simulate",
1293
        action="store_true",
1294
        help="print the output wikitext instead of saving to Commons",
1295
    )
1296
    sim.add_argument(
×
1297
        "--no-simulate",
1298
        action="store_true",
1299
        dest="no_simulate",
1300
        help="forces saving when disabled by --ignore-runpage",
1301
    )
1302
    parser.add_argument(
×
1303
        "--version", action="version", version="%(prog)s " + __version__
1304
    )
1305
    args = parser.parse_args()
×
1306

1307
    run_override = args.ignore_runpage
×
1308
    if run_override:
×
1309
        if args.no_simulate:
×
1310
            simulate = False
×
1311
        else:
1312
            simulate = True
×
1313
    else:
1314
        simulate = args.simulate
×
1315

1316
    site.login()
×
1317
    if args.auto:
×
1318
        main(total=args.total, start=args.start)
×
1319
    elif args.file and "File" in args.file:
×
1320
        main(page=pywikibot.Page(site, args.file))
×
1321
else:
1322
    run_override = False
1✔
1323
    simulate = False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc