• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hasgeek / baseframe / 16218997531

11 Jul 2025 11:32AM UTC coverage: 67.889% (+1.0%) from 66.866%
16218997531

push

github

web-flow
Fix linter issues; fix ValidCoordinates; add OptionalCoordinates (#511)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

120 of 149 new or added lines in 14 files covered. (80.54%)

3 existing lines in 2 files now uncovered.

1592 of 2345 relevant lines covered (67.89%)

2.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.84
/src/baseframe/forms/validators.py
1
"""WTForms validators."""
2
# ruff: noqa: ARG002
3

4
from __future__ import annotations
4✔
5

6
import datetime
4✔
7
import re
4✔
8
from collections import namedtuple
4✔
9
from collections.abc import Iterable
4✔
10
from decimal import Decimal
4✔
11
from fractions import Fraction
4✔
12
from typing import Any, Callable, Optional as OptionalType, Union, cast
4✔
13
from urllib.parse import urljoin, urlparse
4✔
14

15
import dns.resolver
4✔
16
import emoji
4✔
17
import html5lib
4✔
18
import requests
4✔
19
from flask import current_app, request
4✔
20
from pyisemail import is_email
4✔
21
from wtforms import Field as WTField, Form as WTForm
4✔
22
from wtforms.validators import (  # skipcq: PY-W2000
4✔
23
    URL,
24
    DataRequired,
25
    EqualTo,
26
    InputRequired,
27
    Length,
28
    NumberRange,
29
    Optional,
30
    StopValidation,
31
    ValidationError,
32
)
33

34
from coaster.utils import deobfuscate_email, make_name, md5sum
4✔
35

36
from ..extensions import _, __, asset_cache
4✔
37
from ..signals import exception_catchall
4✔
38
from ..utils import is_public_email_domain
4✔
39
from .typing import ValidatorList
4✔
40

41
__all__ = [
4✔
42
    'URL',  # WTForms
43
    'AllUrlsValid',
44
    'AllowedIf',
45
    'DataRequired',  # WTForms
46
    'EqualTo',  # WTForms
47
    'ForEach',
48
    'InputRequired',  # WTForms
49
    'IsEmoji',
50
    'IsNotPublicEmailDomain',
51
    'IsPublicEmailDomain',
52
    'Length',  # WTForms
53
    'NoObfuscatedEmail',
54
    'NumberRange',  # WTForms
55
    'Optional',  # WTForms
56
    'OptionalCoordinates',
57
    'OptionalIf',
58
    'Recaptcha',
59
    'RequiredIf',
60
    'StopValidation',  # WTForms
61
    'ValidCoordinates',
62
    'ValidEmail',
63
    'ValidEmailDomain',
64
    'ValidName',
65
    'ValidUrl',
66
    'ValidationError',  # WTForms
67
]
68

69

70
EMAIL_RE = re.compile(r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,63}\b', re.I)
4✔
71

72
_zero_values = (0, 0.0, Decimal('0'), 0j, Fraction(0, 1), datetime.time(0, 0, 0))
4✔
73

74
RECAPTCHA_VERIFY_SERVER = 'https://www.google.com/recaptcha/api/siteverify'
4✔
75
# Reproduced from flask_wtf.validators, with gettext applied
76
RECAPTCHA_ERROR_CODES = {
4✔
77
    'missing-input-secret': __("The secret parameter is missing"),
78
    'invalid-input-secret': __("The secret parameter is invalid or malformed"),
79
    'missing-input-response': __("The response parameter is missing"),
80
    'invalid-input-response': __("The response parameter is invalid or malformed"),
81
}
82

83
InvalidUrlPatterns = Iterable[tuple[Iterable[Any], str]]
4✔
84
AllowedListInit = OptionalType[
4✔
85
    Union[Iterable[str], Callable[[], OptionalType[Iterable[str]]]]
86
]
87
AllowedList = OptionalType[Iterable[str]]
4✔
88

89

90
def is_empty(value: Any) -> bool:
4✔
91
    """Return True if the value is falsy but not a numeric zero."""
92
    return value not in _zero_values and not value
4✔
93

94

95
FakeField = namedtuple(  # noqa: PYI024
4✔
96
    'FakeField', ['data', 'raw_data', 'errors', 'gettext', 'ngettext']
97
)
98

99

100
class ForEach:
4✔
101
    """
102
    Runs specified validators on each element of an iterable value.
103

104
    If a validator raises :exc:`StopValidation`, it stops other validators within the
105
    chain given to :class:`ForEach`, but not validators specified alongside.
106
    """
107

108
    def __init__(self, validators: ValidatorList) -> None:
4✔
109
        self.validators = validators
4✔
110

111
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
112
        for element in field.data:
4✔
113
            fake_field = FakeField(element, element, [], field.gettext, field.ngettext)
4✔
114
            try:
4✔
115
                for validator in self.validators:
4✔
116
                    validator(form, fake_field)
4✔
117
            except StopValidation as exc:
4✔
118
                if exc.args and exc.args[0]:
4✔
NEW
119
                    field.errors.append(exc.args[0])
×
120

121

122
class AllowedIf:
4✔
123
    """
124
    Validator that allows a value only if another field also has a value.
125

126
    :param str fieldname: Name of the other field
127
    :param str message: Validation error message. Will be formatted with an optional
128
        ``{field}`` label
129
    """
130

131
    default_message = __("This requires ‘{field}’ to be specified")
4✔
132

133
    def __init__(self, fieldname: str, message: OptionalType[str] = None) -> None:
4✔
134
        self.fieldname = fieldname
4✔
135
        self.message = message or self.default_message
4✔
136

137
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
138
        if field.data and is_empty(form[self.fieldname].data):
4✔
139
            raise StopValidation(
4✔
140
                self.message.format(field=form[self.fieldname].label.text)
141
            )
142

143

144
class OptionalIf(Optional):
4✔
145
    """
146
    Validator that makes this field optional if another field has data.
147

148
    If this field is required when the other field is empty, chain it with
149
    :class:`DataRequired`::
150

151
        field = forms.StringField(
152
            "Field",
153
            validators=[
154
                forms.validators.OptionalIf('other'),
155
                forms.validators.DataRequired(),
156
            ],
157
        )
158

159
    :param str fieldname: Name of the other field
160
    :param str message: Validation error message
161
    """
162

163
    default_message = __("This is required")
4✔
164

165
    def __init__(self, fieldname: str, message: OptionalType[str] = None) -> None:
4✔
166
        super().__init__()
4✔
167
        self.fieldname = fieldname
4✔
168
        self.message = message or self.default_message
4✔
169

170
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
171
        if not is_empty(form[self.fieldname].data):
4✔
172
            super().__call__(form, field)
4✔
173

174

175
class RequiredIf(DataRequired):
4✔
176
    """
177
    Validator that makes this field required if another field has data.
178

179
    If this field is also optional when the other field is empty, chain it with
180
    :class:`Optional`::
181

182
        field = forms.StringField(
183
            "Field",
184
            validators=[
185
                forms.validators.RequiredIf('other'),
186
                forms.validators.Optional(),
187
            ],
188
        )
189

190
    :param str fieldname: Name of the other field
191
    :param str message: Validation error message
192
    """
193

194
    default_message = __("This is required")
4✔
195

196
    def __init__(self, fieldname: str, message: OptionalType[str] = None) -> None:
4✔
197
        message = message or self.default_message
4✔
198
        super().__init__(message=message)
4✔
199
        self.fieldname = fieldname
4✔
200
        self.field_flags.pop('required')
4✔
201

202
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
203
        if not is_empty(form[self.fieldname].data):
4✔
204
            super().__call__(form, field)
4✔
205

206

207
class _Comparison:
4✔
208
    """Base class for validators that compare this field's value with another field."""
209

210
    default_message = __("Comparison failed")
4✔
211

212
    def __init__(self, fieldname: str, message: OptionalType[str] = None) -> None:
4✔
213
        self.fieldname = fieldname
×
214
        self.message = message or self.default_message
×
215

216
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
217
        other = form[self.fieldname]
×
218
        if not self.compare(field.data, other.data):
×
219
            d = {
×
220
                'other_label': (hasattr(other, 'label') and other.label.text)
221
                or self.fieldname,
222
                'other_name': self.fieldname,
223
            }
224
            raise ValidationError(self.message.format(**d))
×
225

226
    def compare(self, value: Any, other: Any) -> bool:
4✔
227
        raise NotImplementedError("Subclasses must define ``compare``")
228

229

230
class GreaterThan(_Comparison):
4✔
231
    """
232
    Validate ``field.data > otherfield.data``.
233

234
    :param fieldname:
235
        The name of the other field to compare to.
236
    :param message:
237
        Error message to raise in case of a validation error. Can be
238
        interpolated with `{other_label}` and `{other_name}` to provide a
239
        more helpful error.
240
    """
241

242
    default_message = __("This must be greater than {other_label}")
4✔
243

244
    def compare(self, value: Any, other: Any) -> bool:
4✔
245
        return value > other
×
246

247

248
class GreaterThanEqualTo(_Comparison):
4✔
249
    """
250
    Validate ``field.data >= otherfield.data``.
251

252
    :param fieldname:
253
        The name of the other field to compare to.
254
    :param message:
255
        Error message to raise in case of a validation error. Can be
256
        interpolated with `{other_label}` and `{other_name}` to provide a
257
        more helpful error.
258
    """
259

260
    default_message = __("This must be greater than or equal to {other_label}")
4✔
261

262
    def compare(self, value: Any, other: Any) -> bool:
4✔
263
        return value >= other
×
264

265

266
class LesserThan(_Comparison):
4✔
267
    """
268
    Validate ``field.data < otherfield.data``.
269

270
    :param fieldname:
271
        The name of the other field to compare to.
272
    :param message:
273
        Error message to raise in case of a validation error. Can be
274
        interpolated with `{other_label}` and `{other_name}` to provide a
275
        more helpful error.
276
    """
277

278
    default_message = __("This must be lesser than {other_label}")
4✔
279

280
    def compare(self, value: Any, other: Any) -> bool:
4✔
281
        return value < other
×
282

283

284
class LesserThanEqualTo(_Comparison):
4✔
285
    """
286
    Validate ``field.data <= otherfield.data``.
287

288
    :param fieldname:
289
        The name of the other field to compare to.
290
    :param message:
291
        Error message to raise in case of a validation error. Can be
292
        interpolated with `{other_label}` and `{other_name}` to provide a
293
        more helpful error.
294
    """
295

296
    default_message = __("This must be lesser than or equal to {other_label}")
4✔
297

298
    def compare(self, value: Any, other: Any) -> bool:
4✔
299
        return value <= other
×
300

301

302
class NotEqualTo(_Comparison):
4✔
303
    """
304
    Validate ``field.data != otherfield.data``.
305

306
    :param fieldname:
307
        The name of the other field to compare to.
308
    :param message:
309
        Error message to raise in case of a validation error. Can be
310
        interpolated with `{other_label}` and `{other_name}` to provide a
311
        more helpful error.
312
    """
313

314
    default_message = __("This must not be the same as {other_label}")
4✔
315

316
    def compare(self, value: Any, other: Any) -> bool:
4✔
317
        return value != other
×
318

319

320
class IsEmoji:
4✔
321
    """
322
    Validate field to contain a single emoji.
323

324
    :param message:
325
        Error message to raise in case of a validation error.
326
    """
327

328
    default_message = __("This is not a valid emoji")
4✔
329

330
    def __init__(self, message: OptionalType[str] = None) -> None:
4✔
331
        self.message = message or self.default_message
4✔
332

333
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
334
        if not emoji.is_emoji(field.data):
4✔
335
            raise ValidationError(self.message)
4✔
336

337

338
class IsPublicEmailDomain:
4✔
339
    """
340
    Validate that field.data belongs to a public email domain.
341

342
    If the domain lookup fails and mxsniff raises ``MxLookupError``, this validator
343
    will fail.
344

345
    :param message:
346
        Error message to raise in case of a validation error.
347
    """
348

349
    default_message = __("This domain is not a public email domain")
4✔
350

351
    def __init__(self, message: OptionalType[str] = None, timeout: int = 30) -> None:
4✔
352
        self.message = message or self.default_message
4✔
353
        self.timeout = timeout
4✔
354

355
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
356
        if is_public_email_domain(field.data, default=False, timeout=self.timeout):
4✔
357
            return
4✔
358
        raise ValidationError(self.message)
4✔
359

360

361
class IsNotPublicEmailDomain:
4✔
362
    """
363
    Validate that field.data does not belong to a public email domain.
364

365
    If the domain lookup fails and mxsniff raises ``MxLookupError``, this validator
366
    will still pass, as we expect that most domains are not public email domains.
367

368
    :param message:
369
        Error message to raise in case of a validation error.
370
    """
371

372
    default_message = __("This domain is a public email domain")
4✔
373

374
    def __init__(self, message: OptionalType[str] = None, timeout: int = 30) -> None:
4✔
375
        self.message = message or self.default_message
4✔
376
        self.timeout = timeout
4✔
377

378
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
379
        if not is_public_email_domain(field.data, default=False, timeout=self.timeout):
4✔
380
            return
4✔
381
        raise ValidationError(self.message)
4✔
382

383

384
class ValidEmail:
4✔
385
    """
386
    Validator to confirm an email address is likely to be valid.
387

388
    Criteria: email address is properly formatted and the domain exists.
389

390
    :param str message: Optional validation error message.
391
    """
392

393
    default_message = __("This email address does not appear to be valid")
4✔
394

395
    def __init__(self, message: OptionalType[str] = None) -> None:
4✔
396
        self.message = message
×
397

398
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
399
        try:
×
400
            diagnosis = is_email(field.data, check_dns=True, diagnose=True)
×
401
        except (dns.resolver.Timeout, dns.resolver.NoNameservers):
×
402
            return
×
403
        if diagnosis.code in (0, 3, 4):  # 0 is valid, 3 is DNS No NS, 4 is DNS timeout
×
404
            return
×
405
        raise StopValidation(self.message or diagnosis.message or self.default_message)
×
406

407

408
# Legacy name
409
ValidEmailDomain = ValidEmail
4✔
410

411

412
class ValidUrl:
4✔
413
    """
414
    Validator to confirm a HTTP URL is valid (returns 2xx status code).
415

416
    URIs using other protocol schemes are not validated, but can be explicitly
417
    disallowed by specifying ``allowed_schemes``.
418

419
    :param str message: Error message (None for default error message)
420
    :param str message_urltext: Unused parameter, only used in the :class:`AllUrlsValid`
421
        validator
422
    :param str message_schemes: Error message when the URL scheme is invalid
423
    :param str message_domains: Error message when the URL domain is not whitelisted
424
    :param list invalid_urls: A list of (patterns, message) tuples for URLs that will be
425
        rejected, where ``patterns`` is a list of strings or regular expressions
426
    :param set allowed_schemes: Allowed schemes in URLs (`None` implies no constraints)
427
    :param set allowed_domains: Whitelisted domains (`None` implies no constraints)
428
    :param bool visit_url: Visit the URL to confirm availability (default `True`)
429

430
    ``invalid_urls``, ``allowed_schemes`` and ``allowed_domains`` may also be callables
431
    that take no parameters and return the required data. They will be called once per
432
    validation.
433
    """
434

435
    user_agent = (
4✔
436
        'Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Hasgeek/linkchecker'
437
    )
438

439
    default_message = __("The URL “{url}” is not valid or is currently inaccessible")
4✔
440

441
    default_message_urltext = __(
4✔
442
        "The URL “{url}” linked from “{text}” is not valid or is currently inaccessible"
443
    )
444

445
    default_message_schemes = __("This URL’s protocol is not allowed")
4✔
446

447
    default_message_domains = __("This URL’s domain is not allowed")
4✔
448

449
    def __init__(
4✔
450
        self,
451
        message: OptionalType[str] = None,
452
        message_urltext: OptionalType[str] = None,
453
        message_schemes: OptionalType[str] = None,
454
        message_domains: OptionalType[str] = None,
455
        invalid_urls: InvalidUrlPatterns = (),
456
        allowed_schemes: OptionalType[AllowedListInit] = None,
457
        allowed_domains: OptionalType[AllowedListInit] = None,
458
        visit_url: bool = True,
459
    ) -> None:
460
        self.message = message or self.default_message
4✔
461
        self.message_urltext = message_urltext or self.default_message_urltext
4✔
462
        self.message_schemes = message_schemes or self.default_message_schemes
4✔
463
        self.message_domains = message_domains or self.default_message_domains
4✔
464
        self.invalid_urls = invalid_urls
4✔
465
        self.allowed_schemes = allowed_schemes
4✔
466
        self.allowed_domains = allowed_domains
4✔
467
        self.visit_url = visit_url
4✔
468

469
    def check_url(
4✔
470
        self,
471
        url: str,
472
        allowed_schemes: AllowedList,
473
        allowed_domains: AllowedList,
474
        invalid_urls: InvalidUrlPatterns,
475
        text: Union[str, None] = None,
476
    ) -> OptionalType[str]:
477
        """
478
        Inner method to actually check the URL.
479

480
        This method accepts ``allowed_schemes``, ``allowed_domains`` and
481
        ``invalid_urls`` as direct parameters despite their availability via `self`
482
        because they may be callables, and in :class:`AllUrlsValid` we call
483
        :meth:`check_url` repeatedly. The callables should be called only once. This
484
        optimization has no value in the base class :class:`ValidUrl`.
485

486
        As the validator is instantiated once per form field, it cannot mutate itself
487
        at runtime to cache the callables' results, and must instead pass them from one
488
        method to the next.
489
        """
490
        urlparts = urlparse(url)
4✔
491
        if allowed_schemes and urlparts.scheme not in allowed_schemes:
4✔
492
            return self.message_schemes.format(
4✔
493
                url=url, schemes=_(', ').join(allowed_schemes)
494
            )
495
        if allowed_domains and urlparts.netloc.lower() not in allowed_domains:
4✔
496
            return self.message_domains.format(
4✔
497
                url=url, domains=_(', ').join(allowed_domains)
498
            )
499

500
        if urlparts.scheme not in ('http', 'https') or not self.visit_url:
4✔
501
            # The rest of this function only validates HTTP urls.
502
            return None
4✔
503

504
        cache_key = 'linkchecker/' + md5sum(url)
4✔
505
        try:
4✔
506
            cache_check = cast(Any, asset_cache.get(cache_key))
4✔
507
        except ValueError:  # Possible error from a broken pickle
×
508
            cache_check = None
×
509
        # Read from cache, but assume cache may be broken since Flask-Cache stores data
510
        # as a pickle, which is version-specific
511
        if cache_check and isinstance(cache_check, dict):
4✔
512
            rurl = cache_check.get('url')
×
513
            code = cache_check.get('code')
×
514
        else:
515
            rurl = None  # `rurl` is the response URL after following redirects
4✔
516
            code = None
4✔
517

518
        # TODO: Also honour the robots.txt protocol and stay off URLs that aren't meant
519
        # to be checked. https://docs.python.org/3/library/urllib.robotparser.html
520
        if not rurl or not code:
4✔
521
            try:
4✔
522
                r = requests.get(
4✔
523
                    url,
524
                    timeout=5,
525
                    verify=False,  # nosec: B501  # noqa: S501
526
                    headers={'User-Agent': self.user_agent},
527
                )
528
                code = r.status_code
4✔
529
                rurl = r.url
4✔
530
            except (
4✔
531
                # Still a relative URL? Must be broken
532
                requests.exceptions.MissingSchema,
533
                # Name resolution or connection failed
534
                requests.exceptions.ConnectionError,
535
                # Didn't respond in time
536
                requests.exceptions.Timeout,
537
            ):
538
                code = None
4✔
NEW
539
            except Exception as exc:  # pylint: disable=broad-except  # noqa: BLE001
×
540
                exception_catchall.send(exc)
×
541
                code = None
×
542

543
        if (
4✔
544
            rurl is not None
545
            and code is not None
546
            and code
547
            in (
548
                200,
549
                201,
550
                202,
551
                203,
552
                204,
553
                205,
554
                206,
555
                207,
556
                208,
557
                226,
558
                403,  # For Cloudflare
559
                999,  # For LinkedIn
560
            )
561
        ):
562
            # Cloudflare returns HTTP 403 for urls behind its bot protection.
563
            # Hence we're accepting 403 as an acceptable code.
564
            #
565
            # 999 is a non-standard too-many-requests error. We can't look past it to
566
            # check a URL, so we let it pass
567

568
            # The URL works, so now we check if it's in a reject list. This part
569
            # runs _after_ attempting to load the URL as we want to catch redirects.
570
            for patterns, message in invalid_urls:
4✔
571
                for pattern in patterns:
4✔
572
                    # For text patterns, do a substring search. For regex patterns
573
                    # (assumed so if not text), do a regex search. Test with the final
574
                    # URL from the response, after redirects, but report errors using
575
                    # the URL the user provided
576
                    if (
4✔
577
                        pattern in rurl
578
                        if isinstance(pattern, str)
579
                        else pattern.search(rurl) is not None
580
                    ):
581
                        return message.format(url=url, text=text)
4✔
582
            # All good. The URL works and isn't invalid, so save to cache and return
583
            # without an error message
584
            asset_cache.set(cache_key, {'url': rurl, 'code': code}, timeout=86400)
4✔
585
            return None
4✔
586
        if text is not None and url != text:
4✔
587
            return self.message_urltext.format(url=url, text=text)
4✔
588
        return self.message.format(url=url)
4✔
589

590
    def call_inner(
4✔
591
        self,
592
        field: WTField,
593
        current_url: str,
594
        allowed_schemes: AllowedList,
595
        allowed_domains: AllowedList,
596
        invalid_urls: InvalidUrlPatterns,
597
    ) -> None:
598
        error = self.check_url(
4✔
599
            urljoin(current_url, field.data),
600
            allowed_schemes,
601
            allowed_domains,
602
            invalid_urls,
603
        )
604
        if error:
4✔
605
            raise StopValidation(error)
4✔
606

607
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
608
        if field.data:
4✔
609
            current_url = request.url if request else ''
4✔
610
            invalid_urls = (
4✔
611
                self.invalid_urls()
612
                if callable(self.invalid_urls)
613
                else self.invalid_urls
614
            )
615
            allowed_schemes = (
4✔
616
                self.allowed_schemes()
617
                if callable(self.allowed_schemes)
618
                else self.allowed_schemes
619
            )
620
            allowed_domains = (
4✔
621
                self.allowed_domains()
622
                if callable(self.allowed_domains)
623
                else self.allowed_domains
624
            )
625

626
            return self.call_inner(
4✔
627
                field, current_url, allowed_schemes, allowed_domains, invalid_urls
628
            )
629
        return None
×
630

631

632
class AllUrlsValid(ValidUrl):
4✔
633
    """
634
    Validator to confirm all URLs in a HTML snippet.
635

636
    Subclasses :class:`ValidUrl` and accepts the same parameters.
637
    """
638

639
    def call_inner(
4✔
640
        self,
641
        field: WTField,
642
        current_url: str,
643
        allowed_schemes: AllowedList,
644
        allowed_domains: AllowedList,
645
        invalid_urls: InvalidUrlPatterns,
646
    ) -> None:
647
        html_tree = html5lib.parse(field.data, namespaceHTMLElements=False)
4✔
648
        for text, href in (
4✔
649
            (tag.text, tag.attrib.get('href')) for tag in html_tree.iter('a')
650
        ):
651
            error = self.check_url(
4✔
652
                urljoin(current_url, href),
653
                allowed_schemes,
654
                allowed_domains,
655
                invalid_urls,
656
                text,
657
            )
658
            if error:
4✔
659
                field.errors.append(error)
4✔
660
        if field.errors:
4✔
661
            raise StopValidation()
4✔
662

663

664
class NoObfuscatedEmail:
4✔
665
    """Scan for obfuscated email addresses in the provided text and reject them."""
666

667
    default_message = __("Email address identified")
4✔
668

669
    def __init__(self, message: OptionalType[str] = None) -> None:
4✔
670
        self.message = message or self.default_message
×
671

672
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
673
        emails = EMAIL_RE.findall(deobfuscate_email(field.data or ''))
×
674
        for email in emails:
×
675
            try:
×
676
                diagnosis = is_email(email, check_dns=True, diagnose=True)
×
677
                if diagnosis.code == 0:
×
678
                    raise StopValidation(self.message)
×
NEW
679
            except (dns.resolver.Timeout, dns.resolver.NoNameservers):  # noqa: PERF203
×
680
                pass
×
681

682

683
class ValidName:
4✔
684
    default_message = __(
4✔
685
        "This name contains unsupported characters. "
686
        "It should have letters, numbers and non-terminal hyphens only"
687
    )
688

689
    def __init__(self, message: OptionalType[str] = None) -> None:
4✔
690
        self.message = message or self.default_message
×
691

692
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
693
        if make_name(field.data) != field.data:
×
694
            raise StopValidation(self.message)
×
695

696

697
class ValidCoordinates:
4✔
698
    default_message = __("Valid latitude and longitude expected")
4✔
699
    default_message_latitude = __("Latitude must be within ± 90 degrees")
4✔
700
    default_message_longitude = __("Longitude must be within ± 180 degrees")
4✔
701

702
    def __init__(
4✔
703
        self,
704
        message: OptionalType[str] = None,
705
        message_latitude: OptionalType[str] = None,
706
        message_longitude: OptionalType[str] = None,
707
    ) -> None:
708
        self.message = message or self.default_message
4✔
709
        self.message_latitude = message_latitude or self.default_message_latitude
4✔
710
        self.message_longitude = message_longitude or self.default_message_longitude
4✔
711

712
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
713
        if not field.data or len(field.data) != 2:
4✔
714
            # Don't allow `None`, `()` or `[]`, or lists not of size two.
715
            # While this rejects `0` for being falsy, that too is not a valid value
NEW
716
            raise StopValidation(self.message)
×
717
        latitude, longitude = field.data
4✔
718
        if latitude is None or (not -90 <= latitude <= 90):
4✔
719
            raise StopValidation(self.message_latitude)
4✔
720
        if longitude is None or (not -180 <= longitude <= 180):
4✔
721
            raise StopValidation(self.message_longitude)
4✔
722

723

724
class OptionalCoordinates:
4✔
725
    default_message = __("Valid latitude and longitude expected")
4✔
726
    default_message_latitude = __("Latitude must be within ± 90 degrees")
4✔
727
    default_message_longitude = __("Longitude must be within ± 180 degrees")
4✔
728

729
    def __init__(
4✔
730
        self,
731
        message: OptionalType[str] = None,
732
        message_latitude: OptionalType[str] = None,
733
        message_longitude: OptionalType[str] = None,
734
    ) -> None:
735
        self.message = message or self.default_message
4✔
736
        self.message_latitude = message_latitude or self.default_message_latitude
4✔
737
        self.message_longitude = message_longitude or self.default_message_longitude
4✔
738

739
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
740
        if (
4✔
741
            field.data is None
742
            or field.data == ''
743
            or tuple(field.data)
744
            in [(), (None,), ('',), (None, None), ('', None), (None, ''), ('', '')]
745
        ):
746
            # No value provided, treat as optional
747
            return
4✔
748
        if len(field.data) != 2:
4✔
749
            raise StopValidation(self.message)
×
750
        latitude, longitude = field.data
4✔
751
        if latitude is None or (not -90 <= latitude <= 90):
4✔
752
            raise StopValidation(self.message_latitude)
4✔
753
        if longitude is None or (not -180 <= longitude <= 180):
4✔
754
            raise StopValidation(self.message_longitude)
4✔
755

756

757
class Recaptcha:
4✔
758
    """Validates a ReCaptcha."""
759

760
    default_message_network = __("The server was temporarily unreachable. Try again")
4✔
761

762
    def __init__(
4✔
763
        self,
764
        message: OptionalType[str] = None,
765
        message_network: OptionalType[str] = None,
766
    ) -> None:
767
        if message is None:
×
768
            message = RECAPTCHA_ERROR_CODES['missing-input-response']
×
769

770
        self.message = message
×
771
        self.message_network = message_network or self.default_message_network
×
772

773
    def __call__(self, form: WTForm, field: WTField) -> None:
4✔
774
        if current_app.testing or current_app.config.get('RECAPTCHA_DISABLED'):
×
775
            return
×
776

777
        if request.is_json:
×
778
            jsondata = request.json
×
779
            if isinstance(jsondata, dict):
×
780
                response = jsondata.get('g-recaptcha-response', '')
×
781
            else:
782
                response = ''
×
783
        else:
784
            response = request.form.get('g-recaptcha-response', '')
×
785
        remote_ip = request.remote_addr or ''
×
786

787
        if not response:
×
788
            raise ValidationError(self.message)
×
789

790
        if not self._validate_recaptcha(response, remote_ip):
×
791
            field.recaptcha_error = 'incorrect-captcha-sol'
×
792
            raise ValidationError(self.message)
×
793

794
    def _validate_recaptcha(self, response: str, remote_addr: str) -> bool:
4✔
795
        """Perform the actual validation."""
796
        try:
×
797
            private_key = current_app.config['RECAPTCHA_PRIVATE_KEY']
×
798
        except KeyError:
×
799
            raise RuntimeError("No RECAPTCHA_PRIVATE_KEY config set") from None
×
800

801
        data = {
×
802
            'secret': private_key,
803
            'remoteip': remote_addr,
804
            'response': response,
805
        }
806

807
        try:
×
808
            http_response = requests.post(
×
809
                RECAPTCHA_VERIFY_SERVER, data=data, timeout=30
810
            )
811
        except (
×
812
            requests.exceptions.ConnectionError,
813
            requests.exceptions.Timeout,
814
        ):
815
            raise ValidationError(self.message_network) from None
×
816

817
        if http_response.status_code != 200:
×
818
            return False
×
819

820
        json_resp = http_response.json()
×
821

822
        if json_resp['success']:
×
823
            return True
×
824

825
        for error in json_resp.get('error-codes', []):
×
826
            if error in RECAPTCHA_ERROR_CODES:
×
827
                raise ValidationError(RECAPTCHA_ERROR_CODES[error])
×
828

829
        return False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc