• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

joke2k / django-environ / 11748032612

08 Nov 2024 07:04PM UTC coverage: 92.654% (+0.4%) from 92.215%
11748032612

Pull #545

github

sergeyklay
Merge branch 'main' into develop
Pull Request #545: Release v0.12.0

269 of 312 branches covered (86.22%)

28 of 31 new or added lines in 2 files covered. (90.32%)

2 existing lines in 1 file now uncovered.

555 of 599 relevant lines covered (92.65%)

16.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.62
/environ/environ.py
1
# This file is part of the django-environ.
2
#
3
# Copyright (c) 2021-2024, Serghei Iakovlev <oss@serghei.pl>
4
# Copyright (c) 2013-2021, Daniele Faraglia <daniele.faraglia@gmail.com>
5
#
6
# For the full copyright and license information, please view
7
# the LICENSE.txt file that was distributed with this source code.
8

9
"""
3✔
10
Django-environ allows you to utilize 12factor inspired environment
11
variables to configure your Django application.
12
"""
13

14
import ast
18✔
15
import itertools
18✔
16
import logging
18✔
17
import os
18✔
18
import re
18✔
19
import sys
18✔
20
import warnings
18✔
21
from urllib.parse import (
18✔
22
    parse_qs,
23
    ParseResult,
24
    quote,
25
    unquote,
26
    unquote_plus,
27
    urlparse,
28
    urlunparse,
29
)
30

31
from .compat import (
18✔
32
    DJANGO_POSTGRES,
33
    ImproperlyConfigured,
34
    json,
35
    PYMEMCACHE_DRIVER,
36
    REDIS_DRIVER,
37
)
38
from .fileaware_mapping import FileAwareMapping
18✔
39

40
Openable = (str, os.PathLike)
18✔
41
logger = logging.getLogger(__name__)
18✔
42

43

44
def _cast(value):
18✔
45
    # Safely evaluate an expression node or a string containing a Python
46
    # literal or container display.
47
    # https://docs.python.org/3/library/ast.html#ast.literal_eval
48
    try:
18✔
49
        return ast.literal_eval(value)
18✔
50
    except (ValueError, SyntaxError):
18✔
51
        return value
18✔
52

53

54
def _cast_int(v):
18✔
55
    """Return int if possible."""
56
    return int(v) if hasattr(v, 'isdigit') and v.isdigit() else v
18✔
57

58

59
def _cast_urlstr(v):
18✔
60
    return unquote(v) if isinstance(v, str) else v
18✔
61

62

63
def _urlparse_quote(url):
18✔
64
    return urlparse(quote(url, safe=':/?&=@'))
18✔
65

66

67
class NoValue:
18✔
68
    """Represent of no value object."""
69

70
    def __repr__(self):
18✔
71
        return f'<{self.__class__.__name__}>'
×
72

73

74
class Env:
18✔
75
    """Provide scheme-based lookups of environment variables so that each
76
    caller doesn't have to pass in ``cast`` and ``default`` parameters.
77

78
    Usage:::
79

80
        import environ
81
        import os
82

83
        env = environ.Env(
84
            # set casting, default value
85
            MAIL_ENABLED=(bool, False),
86
            SMTP_LOGIN=(str, 'DEFAULT')
87
        )
88

89
        # Set the project base directory
90
        BASE_DIR = os.path.dirname(
91
            os.path.dirname(os.path.abspath(__file__))
92
        )
93

94
        # Take environment variables from .env file
95
        environ.Env.read_env(os.path.join(BASE_DIR, '.env'))
96

97
        # False if not in os.environ due to casting above
98
        MAIL_ENABLED = env('MAIL_ENABLED')
99

100
        # 'DEFAULT' if not in os.environ due to casting above
101
        SMTP_LOGIN = env('SMTP_LOGIN')
102
    """
103

104
    ENVIRON = os.environ
18✔
105
    NOTSET = NoValue()
18✔
106
    BOOLEAN_TRUE_STRINGS = ('true', 'on', 'ok', 'y', 'yes', '1')
18✔
107
    URL_CLASS = ParseResult
18✔
108

109
    POSTGRES_FAMILY = ['postgres', 'postgresql', 'psql', 'pgsql', 'postgis']
18✔
110

111
    DEFAULT_DATABASE_ENV = 'DATABASE_URL'
18✔
112
    DB_SCHEMES = {
18✔
113
        'postgres': DJANGO_POSTGRES,
114
        'postgresql': DJANGO_POSTGRES,
115
        'psql': DJANGO_POSTGRES,
116
        'pgsql': DJANGO_POSTGRES,
117
        'postgis': 'django.contrib.gis.db.backends.postgis',
118
        'cockroachdb': 'django_cockroachdb',
119
        'mysql': 'django.db.backends.mysql',
120
        'mysql2': 'django.db.backends.mysql',
121
        'mysql-connector': 'mysql.connector.django',
122
        'mysqlgis': 'django.contrib.gis.db.backends.mysql',
123
        'mssql': 'mssql',
124
        'oracle': 'django.db.backends.oracle',
125
        'pyodbc': 'sql_server.pyodbc',
126
        'redshift': 'django_redshift_backend',
127
        'spatialite': 'django.contrib.gis.db.backends.spatialite',
128
        'sqlite': 'django.db.backends.sqlite3',
129
        'ldap': 'ldapdb.backends.ldap',
130
    }
131
    _DB_BASE_OPTIONS = [
18✔
132
        'CONN_MAX_AGE',
133
        'ATOMIC_REQUESTS',
134
        'AUTOCOMMIT',
135
        'DISABLE_SERVER_SIDE_CURSORS',
136
        'CONN_HEALTH_CHECKS',
137
    ]
138

139
    DEFAULT_CACHE_ENV = 'CACHE_URL'
18✔
140
    CACHE_SCHEMES = {
18✔
141
        'dbcache': 'django.core.cache.backends.db.DatabaseCache',
142
        'dummycache': 'django.core.cache.backends.dummy.DummyCache',
143
        'filecache': 'django.core.cache.backends.filebased.FileBasedCache',
144
        'locmemcache': 'django.core.cache.backends.locmem.LocMemCache',
145
        'memcache': 'django.core.cache.backends.memcached.MemcachedCache',
146
        'pymemcache': PYMEMCACHE_DRIVER,
147
        'pylibmc': 'django.core.cache.backends.memcached.PyLibMCCache',
148
        'rediscache': REDIS_DRIVER,
149
        'redis': REDIS_DRIVER,
150
        'rediss': REDIS_DRIVER,
151
    }
152
    _CACHE_BASE_OPTIONS = [
18✔
153
        'TIMEOUT',
154
        'KEY_PREFIX',
155
        'VERSION',
156
        'KEY_FUNCTION',
157
        'BINARY',
158
    ]
159

160
    DEFAULT_EMAIL_ENV = 'EMAIL_URL'
18✔
161
    EMAIL_SCHEMES = {
18✔
162
        'smtp': 'django.core.mail.backends.smtp.EmailBackend',
163
        'smtps': 'django.core.mail.backends.smtp.EmailBackend',
164
        'smtp+tls': 'django.core.mail.backends.smtp.EmailBackend',
165
        'smtp+ssl': 'django.core.mail.backends.smtp.EmailBackend',
166
        'consolemail': 'django.core.mail.backends.console.EmailBackend',
167
        'filemail': 'django.core.mail.backends.filebased.EmailBackend',
168
        'memorymail': 'django.core.mail.backends.locmem.EmailBackend',
169
        'dummymail': 'django.core.mail.backends.dummy.EmailBackend'
170
    }
171
    _EMAIL_BASE_OPTIONS = ['EMAIL_USE_TLS', 'EMAIL_USE_SSL']
18✔
172

173
    DEFAULT_SEARCH_ENV = 'SEARCH_URL'
18✔
174
    SEARCH_SCHEMES = {
18✔
175
        "elasticsearch": "haystack.backends.elasticsearch_backend."
176
                         "ElasticsearchSearchEngine",
177
        "elasticsearch2": "haystack.backends.elasticsearch2_backend."
178
                          "Elasticsearch2SearchEngine",
179
        "elasticsearch5": "haystack.backends.elasticsearch5_backend."
180
                          "Elasticsearch5SearchEngine",
181
        "elasticsearch7": "haystack.backends.elasticsearch7_backend."
182
                          "Elasticsearch7SearchEngine",
183
        "solr": "haystack.backends.solr_backend.SolrEngine",
184
        "whoosh": "haystack.backends.whoosh_backend.WhooshEngine",
185
        "xapian": "haystack.backends.xapian_backend.XapianEngine",
186
        "simple": "haystack.backends.simple_backend.SimpleEngine",
187
    }
188
    ELASTICSEARCH_FAMILY = [scheme + s for scheme in SEARCH_SCHEMES
18!
189
                            if scheme.startswith("elasticsearch")
190
                            for s in ('', 's')]
191
    CLOUDSQL = 'cloudsql'
18✔
192

193
    DEFAULT_CHANNELS_ENV = "CHANNELS_URL"
18✔
194
    CHANNELS_SCHEMES = {
18✔
195
        "inmemory": "channels.layers.InMemoryChannelLayer",
196
        "redis": "channels_redis.core.RedisChannelLayer",
197
        "redis+pubsub": "channels_redis.pubsub.RedisPubSubChannelLayer"
198
    }
199

200
    def __init__(self, **scheme):
18✔
201
        self.smart_cast = True
18✔
202
        self.escape_proxy = False
18✔
203
        self.prefix = ""
18✔
204
        self.scheme = scheme
18✔
205

206
    def __call__(self, var, cast=None, default=NOTSET, parse_default=False):
18✔
207
        return self.get_value(
18✔
208
            var,
209
            cast=cast,
210
            default=default,
211
            parse_default=parse_default
212
        )
213

214
    def __contains__(self, var):
18✔
215
        return var in self.ENVIRON
18✔
216

217
    def str(self, var, default=NOTSET, multiline=False):
18✔
218
        """
219
        :rtype: str
220
        """
221
        value = self.get_value(var, cast=str, default=default)
18✔
222
        if multiline:
18✔
223
            return re.sub(r'(\\r)?\\n', r'\n', value)
18✔
224
        return value
18✔
225

226
    def bytes(self, var, default=NOTSET, encoding='utf8'):
18✔
227
        """
228
        :rtype: bytes
229
        """
230
        value = self.get_value(var, cast=str, default=default)
18✔
231
        if hasattr(value, 'encode'):
18✔
232
            return value.encode(encoding)
18✔
233
        return value
18✔
234

235
    def bool(self, var, default=NOTSET):
18✔
236
        """
237
        :rtype: bool
238
        """
239
        return self.get_value(var, cast=bool, default=default)
18✔
240

241
    def int(self, var, default=NOTSET):
18✔
242
        """
243
        :rtype: int
244
        """
245
        return self.get_value(var, cast=int, default=default)
18✔
246

247
    def float(self, var, default=NOTSET):
18✔
248
        """
249
        :rtype: float
250
        """
251
        return self.get_value(var, cast=float, default=default)
18✔
252

253
    def json(self, var, default=NOTSET):
18✔
254
        """
255
        :returns: Json parsed
256
        """
257
        return self.get_value(var, cast=json.loads, default=default)
18✔
258

259
    def list(self, var, cast=None, default=NOTSET):
18✔
260
        """
261
        :rtype: list
262
        """
263
        return self.get_value(
18✔
264
            var,
265
            cast=list if not cast else [cast],
266
            default=default
267
        )
268

269
    def tuple(self, var, cast=None, default=NOTSET):
18✔
270
        """
271
        :rtype: tuple
272
        """
273
        return self.get_value(
18✔
274
            var,
275
            cast=tuple if not cast else (cast,),
276
            default=default
277
        )
278

279
    def dict(self, var, cast=dict, default=NOTSET):
18✔
280
        """
281
        :rtype: dict
282
        """
283
        return self.get_value(var, cast=cast, default=default)
18✔
284

285
    def url(self, var, default=NOTSET):
18✔
286
        """
287
        :rtype: urllib.parse.ParseResult
288
        """
289
        return self.get_value(
18✔
290
            var,
291
            cast=urlparse,
292
            default=default,
293
            parse_default=True
294
        )
295

296
    def db_url(self, var=DEFAULT_DATABASE_ENV, default=NOTSET, engine=None):
18✔
297
        """Returns a config dictionary, defaulting to DATABASE_URL.
298

299
        The db method is an alias for db_url.
300

301
        :rtype: dict
302
        """
303
        return self.db_url_config(
18✔
304
            self.get_value(var, default=default),
305
            engine=engine
306
        )
307

308
    db = db_url
18✔
309

310
    def cache_url(self, var=DEFAULT_CACHE_ENV, default=NOTSET, backend=None):
18✔
311
        """Returns a config dictionary, defaulting to CACHE_URL.
312

313
        The cache method is an alias for cache_url.
314

315
        :rtype: dict
316
        """
317
        return self.cache_url_config(
18✔
318
            self.url(var, default=default),
319
            backend=backend
320
        )
321

322
    cache = cache_url
18✔
323

324
    def email_url(self, var=DEFAULT_EMAIL_ENV, default=NOTSET, backend=None):
18✔
325
        """Returns a config dictionary, defaulting to EMAIL_URL.
326

327
        The email method is an alias for email_url.
328

329
        :rtype: dict
330
        """
331
        return self.email_url_config(
18✔
332
            self.url(var, default=default),
333
            backend=backend
334
        )
335

336
    email = email_url
18✔
337

338
    def search_url(self, var=DEFAULT_SEARCH_ENV, default=NOTSET, engine=None):
18✔
339
        """Returns a config dictionary, defaulting to SEARCH_URL.
340

341
        :rtype: dict
342
        """
343
        return self.search_url_config(
×
344
            self.url(var, default=default),
345
            engine=engine
346
        )
347

348
    def channels_url(self, var=DEFAULT_CHANNELS_ENV, default=NOTSET,
18✔
349
                     backend=None):
350
        """Returns a config dictionary, defaulting to CHANNELS_URL.
351

352
        :rtype: dict
353
        """
NEW
354
        return self.channels_url_config(
×
355
            self.url(var, default=default),
356
            backend=backend
357
        )
358

359
    channels = channels_url
18✔
360

361
    def path(self, var, default=NOTSET, **kwargs):
18✔
362
        """
363
        :rtype: Path
364
        """
365
        return Path(self.get_value(var, default=default), **kwargs)
18✔
366

367
    def get_value(self, var, cast=None, default=NOTSET, parse_default=False):
18✔
368
        """Return value for given environment variable.
369

370
        :param str var:
371
            Name of variable.
372
        :param collections.abc.Callable or None cast:
373
            Type to cast return value as.
374
        :param default:
375
             If var not present in environ, return this instead.
376
        :param bool parse_default:
377
            Force to parse default.
378
        :returns: Value from environment or default (if set).
379
        :rtype: typing.IO[typing.Any]
380
        """
381

382
        logger.debug(
18✔
383
            "get '%s' casted as '%s' with default '%s'",
384
            var, cast, default)
385

386
        var_name = f'{self.prefix}{var}'
18✔
387
        if var_name in self.scheme:
18✔
388
            var_info = self.scheme[var_name]
18✔
389

390
            try:
18✔
391
                has_default = len(var_info) == 2
18✔
392
            except TypeError:
18✔
393
                has_default = False
18✔
394

395
            if has_default:
18✔
396
                if not cast:
18✔
397
                    cast = var_info[0]
18✔
398

399
                if default is self.NOTSET:
18!
400
                    try:
18✔
401
                        default = var_info[1]
18✔
402
                    except IndexError:
×
403
                        pass
×
404
            else:
405
                if not cast:
18✔
406
                    cast = var_info
18✔
407

408
        try:
18✔
409
            value = self.ENVIRON[var_name]
18✔
410
        except KeyError as exc:
18✔
411
            if default is self.NOTSET:
18✔
412
                error_msg = f'Set the {var_name} environment variable'
18✔
413
                raise ImproperlyConfigured(error_msg) from exc
18✔
414

415
            value = default
18✔
416

417
        # Resolve any proxied values
418
        prefix = b'$' if isinstance(value, bytes) else '$'
18✔
419
        escape = rb'\$' if isinstance(value, bytes) else r'\$'
18✔
420
        if hasattr(value, 'startswith') and value.startswith(prefix):
18✔
421
            value = value.lstrip(prefix)
18✔
422
            value = self.get_value(value, cast=cast, default=default)
18✔
423

424
        if self.escape_proxy and hasattr(value, 'replace'):
18✔
425
            value = value.replace(escape, prefix)
18✔
426

427
        # Smart casting
428
        if self.smart_cast:
18!
429
            if cast is None and default is not None and \
18✔
430
                    not isinstance(default, NoValue):
431
                cast = type(default)
18✔
432

433
        value = None if default is None and value == '' else value
18✔
434

435
        if value != default or (parse_default and value is not None):
18✔
436
            value = self.parse_value(value, cast)
18✔
437

438
        return value
18✔
439

440
    @classmethod
18✔
441
    def parse_value(cls, value, cast):
18✔
442
        """Parse and cast provided value
443

444
        :param value: Stringed value.
445
        :param cast: Type to cast return value as.
446

447
        :returns: Casted value
448
        """
449
        if cast is None:
18✔
450
            return value
18✔
451
        if cast is bool:
18✔
452
            try:
18✔
453
                value = int(value) != 0
18✔
454
            except ValueError:
18✔
455
                value = value.lower().strip() in cls.BOOLEAN_TRUE_STRINGS
18✔
456
        elif isinstance(cast, list):
18✔
457
            value = list(map(cast[0], [x for x in value.split(',') if x]))
18!
458
        elif isinstance(cast, tuple):
18✔
459
            val = value.strip('(').strip(')').split(',')
18✔
460
            value = tuple(map(cast[0], [x for x in val if x]))
18!
461
        elif isinstance(cast, dict):
18✔
462
            key_cast = cast.get('key', str)
18✔
463
            value_cast = cast.get('value', str)
18✔
464
            value_cast_by_key = cast.get('cast', {})
18✔
465
            value = dict(map(
18✔
466
                lambda kv: (
467
                    key_cast(kv[0]),
468
                    cls.parse_value(
469
                        kv[1],
470
                        value_cast_by_key.get(kv[0], value_cast)
471
                    )
472
                ),
473
                [val.split('=') for val in value.split(';') if val]
474
            ))
475
        elif cast is dict:
18✔
476
            value = dict([v.split('=', 1) for v in value.split(',') if v])
18!
477
        elif cast is list:
18✔
478
            value = [x for x in value.split(',') if x]
18!
479
        elif cast is tuple:
18✔
480
            val = value.strip('(').strip(')').split(',')
18✔
481
            # pylint: disable=consider-using-generator
482
            value = tuple([x for x in val if x])
18!
483
        elif cast is float:
18✔
484
            # clean string
485
            float_str = re.sub(r'[^\d,.-]', '', value)
18✔
486
            # split for avoid thousand separator and different
487
            # locale comma/dot symbol
488
            parts = re.split(r'[,.]', float_str)
18✔
489
            if len(parts) == 1:
18✔
490
                float_str = parts[0]
18✔
491
            else:
492
                float_str = f"{''.join(parts[0:-1])}.{parts[-1]}"
18✔
493
            value = float(float_str)
18✔
494
        else:
495
            value = cast(value)
18✔
496
        return value
18✔
497

498
    @classmethod
18✔
499
    # pylint: disable=too-many-statements
500
    def db_url_config(cls, url, engine=None):
18✔
501
        # pylint: enable-msg=too-many-statements
502
        """Parse an arbitrary database URL.
503

504
        Supports the following URL schemas:
505

506
        * PostgreSQL: ``postgres[ql]?://`` or ``p[g]?sql://``
507
        * PostGIS: ``postgis://``
508
        * MySQL: ``mysql://`` or ``mysql2://``
509
        * MySQL (GIS): ``mysqlgis://``
510
        * MySQL Connector Python from Oracle: ``mysql-connector://``
511
        * SQLite: ``sqlite://``
512
        * SQLite with SpatiaLite for GeoDjango: ``spatialite://``
513
        * Oracle: ``oracle://``
514
        * Microsoft SQL Server: ``mssql://``
515
        * PyODBC: ``pyodbc://``
516
        * Amazon Redshift: ``redshift://``
517
        * LDAP: ``ldap://``
518

519
        :param urllib.parse.ParseResult or str url:
520
            Database URL to parse.
521
        :param str or None engine:
522
            If None, the database engine is evaluates from the ``url``.
523
        :return: Parsed database URL.
524
        :rtype: dict
525
        """
526
        if not isinstance(url, cls.URL_CLASS):
18!
527
            if url == 'sqlite://:memory:':
18✔
528
                # this is a special case, because if we pass this URL into
529
                # urlparse, urlparse will choke trying to interpret "memory"
530
                # as a port number
531
                return {
18✔
532
                    'ENGINE': cls.DB_SCHEMES['sqlite'],
533
                    'NAME': ':memory:'
534
                }
535
                # note: no other settings are required for sqlite
536
            try:
18✔
537
                url = urlparse(url)
18✔
538
            # handle Invalid IPv6 URL
539
            except ValueError:
18✔
540
                url = _urlparse_quote(url)
18✔
541

542
        config = {}
18✔
543

544
        # handle unexpected URL schemes with special characters
545
        if not url.path:
18✔
546
            url = _urlparse_quote(urlunparse(url))
18✔
547
        # Remove query strings.
548
        path = url.path[1:]
18✔
549
        path = unquote_plus(path.split('?', 2)[0])
18✔
550

551
        if url.scheme == 'sqlite':
18✔
552
            if path == '':
18✔
553
                # if we are using sqlite and we have no path, then assume we
554
                # want an in-memory database (this is the behaviour of
555
                # sqlalchemy)
556
                path = ':memory:'
18✔
557
            if url.netloc:
18✔
558
                warnings.warn(
18✔
559
                    f'SQLite URL contains host component {url.netloc!r}, '
560
                    'it will be ignored',
561
                    stacklevel=3
562
                )
563
        if url.scheme == 'ldap':
18✔
564
            path = f'{url.scheme}://{url.hostname}'
18✔
565
            if url.port:
18!
566
                path += f':{url.port}'
×
567

568
        user_host = url.netloc.rsplit('@', 1)
18✔
569
        if url.scheme in cls.POSTGRES_FAMILY and ',' in user_host[-1]:
18✔
570
            # Parsing postgres cluster dsn
571
            hinfo = list(
18✔
572
                itertools.zip_longest(
573
                    *(
574
                        host.rsplit(':', 1)
575
                        for host in user_host[-1].split(',')
576
                    )
577
                )
578
            )
579
            hostname = ','.join(hinfo[0])
18✔
580
            port = ','.join(filter(None, hinfo[1])) if len(hinfo) == 2 else ''
18✔
581
        else:
582
            hostname = url.hostname
18✔
583
            port = url.port
18✔
584

585
        # Update with environment configuration.
586
        config.update({
18✔
587
            'NAME': path or '',
588
            'USER': _cast_urlstr(url.username) or '',
589
            'PASSWORD': _cast_urlstr(url.password) or '',
590
            'HOST': hostname or '',
591
            'PORT': _cast_int(port) or '',
592
        })
593

594
        if (
18✔
595
                url.scheme in cls.POSTGRES_FAMILY and path.startswith('/')
596
                or cls.CLOUDSQL in path and path.startswith('/')
597
        ):
598
            config['HOST'], config['NAME'] = path.rsplit('/', 1)
18✔
599

600
        if url.scheme == 'oracle' and path == '':
18✔
601
            config['NAME'] = config['HOST']
18✔
602
            config['HOST'] = ''
18✔
603

604
        if url.scheme == 'oracle':
18✔
605
            # Django oracle/base.py strips port and fails on non-string value
606
            if not config['PORT']:
18✔
607
                del config['PORT']
18✔
608
            else:
609
                config['PORT'] = str(config['PORT'])
18✔
610

611
        if url.query:
18✔
612
            config_options = {}
18✔
613
            for k, v in parse_qs(url.query).items():
18✔
614
                if k.upper() in cls._DB_BASE_OPTIONS:
18✔
615
                    config.update({k.upper(): _cast(v[0])})
18✔
616
                else:
617
                    config_options.update({k: _cast_int(v[0])})
18✔
618
            config['OPTIONS'] = config_options
18✔
619

620
        if engine:
18✔
621
            config['ENGINE'] = engine
18✔
622
        else:
623
            config['ENGINE'] = url.scheme
18✔
624

625
        if config['ENGINE'] in cls.DB_SCHEMES:
18✔
626
            config['ENGINE'] = cls.DB_SCHEMES[config['ENGINE']]
18✔
627

628
        if not config.get('ENGINE', False):
18!
629
            warnings.warn(f'Engine not recognized from url: {config}')
×
630
            return {}
×
631

632
        return config
18✔
633

634
    @classmethod
18✔
635
    def cache_url_config(cls, url, backend=None):
18✔
636
        """Parse an arbitrary cache URL.
637

638
        :param urllib.parse.ParseResult or str url:
639
            Cache URL to parse.
640
        :param str or None backend:
641
            If None, the backend is evaluates from the ``url``.
642
        :return: Parsed cache URL.
643
        :rtype: dict
644
        """
645
        if not isinstance(url, cls.URL_CLASS):
18✔
646
            if not url:
18✔
647
                return {}
18✔
648
            url = urlparse(url)
18✔
649

650
        if url.scheme not in cls.CACHE_SCHEMES:
18✔
651
            raise ImproperlyConfigured(f'Invalid cache schema {url.scheme}')
18✔
652

653
        location = url.netloc.split(',')
18✔
654
        if len(location) == 1:
18✔
655
            location = location[0]
18✔
656

657
        config = {
18✔
658
            'BACKEND': cls.CACHE_SCHEMES[url.scheme],
659
            'LOCATION': location,
660
        }
661

662
        # Add the drive to LOCATION
663
        if url.scheme == 'filecache':
18✔
664
            config.update({
18✔
665
                'LOCATION': url.netloc + url.path,
666
            })
667

668
        # urlparse('pymemcache://127.0.0.1:11211')
669
        # => netloc='127.0.0.1:11211', path=''
670
        #
671
        # urlparse('pymemcache://memcached:11211/?key_prefix=ci')
672
        # => netloc='memcached:11211', path='/'
673
        #
674
        # urlparse('memcache:///tmp/memcached.sock')
675
        # => netloc='', path='/tmp/memcached.sock'
676
        if not url.netloc and url.scheme in ['memcache', 'pymemcache']:
18✔
677
            config.update({
18✔
678
                'LOCATION': 'unix:' + url.path,
679
            })
680
        elif url.scheme.startswith('redis'):
18✔
681
            if url.hostname:
18✔
682
                scheme = url.scheme.replace('cache', '')
18✔
683
            else:
684
                scheme = 'unix'
18✔
685
            locations = [scheme + '://' + loc + url.path
18!
686
                         for loc in url.netloc.split(',')]
687
            if len(locations) == 1:
18✔
688
                config['LOCATION'] = locations[0]
18✔
689
            else:
690
                config['LOCATION'] = locations
18✔
691

692
        if url.query:
18✔
693
            config_options = {}
18✔
694
            for k, v in parse_qs(url.query).items():
18✔
695
                opt = {k.upper(): _cast(v[0])}
18✔
696
                if k.upper() in cls._CACHE_BASE_OPTIONS:
18✔
697
                    config.update(opt)
18✔
698
                else:
699
                    config_options.update(opt)
18✔
700
            config['OPTIONS'] = config_options
18✔
701

702
        if backend:
18✔
703
            config['BACKEND'] = backend
18✔
704

705
        return config
18✔
706

707
    @classmethod
18✔
708
    def email_url_config(cls, url, backend=None):
18✔
709
        """Parse an arbitrary email URL.
710

711
        :param urllib.parse.ParseResult or str url:
712
            Email URL to parse.
713
        :param str or None backend:
714
            If None, the backend is evaluates from the ``url``.
715
        :return: Parsed email URL.
716
        :rtype: dict
717
        """
718

719
        config = {}
18✔
720

721
        url = urlparse(url) if not isinstance(url, cls.URL_CLASS) else url
18✔
722

723
        # Remove query strings
724
        path = url.path[1:]
18✔
725
        path = unquote_plus(path.split('?', 2)[0])
18✔
726

727
        # Update with environment configuration
728
        config.update({
18✔
729
            'EMAIL_FILE_PATH': path,
730
            'EMAIL_HOST_USER': _cast_urlstr(url.username),
731
            'EMAIL_HOST_PASSWORD': _cast_urlstr(url.password),
732
            'EMAIL_HOST': url.hostname,
733
            'EMAIL_PORT': _cast_int(url.port),
734
        })
735

736
        if backend:
18✔
737
            config['EMAIL_BACKEND'] = backend
18✔
738
        elif url.scheme not in cls.EMAIL_SCHEMES:
18!
739
            raise ImproperlyConfigured(f'Invalid email schema {url.scheme}')
×
740
        elif url.scheme in cls.EMAIL_SCHEMES:
18!
741
            config['EMAIL_BACKEND'] = cls.EMAIL_SCHEMES[url.scheme]
18✔
742

743
        if url.scheme in ('smtps', 'smtp+tls'):
18!
744
            config['EMAIL_USE_TLS'] = True
18✔
745
        elif url.scheme == 'smtp+ssl':
×
746
            config['EMAIL_USE_SSL'] = True
×
747

748
        if url.query:
18!
749
            config_options = {}
×
750
            for k, v in parse_qs(url.query).items():
×
751
                opt = {k.upper(): _cast_int(v[0])}
×
752
                if k.upper() in cls._EMAIL_BASE_OPTIONS:
×
753
                    config.update(opt)
×
754
                else:
755
                    config_options.update(opt)
×
756
            config['OPTIONS'] = config_options
×
757

758
        return config
18✔
759

760
    @classmethod
18✔
761
    def channels_url_config(cls, url, backend=None):
18✔
762
        """Parse an arbitrary channels URL.
763

764
        :param urllib.parse.ParseResult or str url:
765
            Email URL to parse.
766
        :param str or None backend:
767
            If None, the backend is evaluates from the ``url``.
768
        :return: Parsed channels URL.
769
        :rtype: dict
770
        """
771
        config = {}
18✔
772
        url = urlparse(url) if not isinstance(url, cls.URL_CLASS) else url
18✔
773

774
        if backend:
18!
NEW
775
            config["BACKEND"] = backend
×
776
        elif url.scheme not in cls.CHANNELS_SCHEMES:
18!
NEW
777
            raise ImproperlyConfigured(f"Invalid channels schema {url.scheme}")
×
778
        else:
779
            config["BACKEND"] = cls.CHANNELS_SCHEMES[url.scheme]
18✔
780
            if url.scheme in ("redis", "redis+pubsub"):
18✔
781
                config["CONFIG"] = {
18✔
782
                    "hosts": [url._replace(scheme="redis").geturl()]
783
                }
784

785
        return config
18✔
786

787
    @classmethod
18✔
788
    def _parse_common_search_params(cls, url):
18✔
789
        cfg = {}
18✔
790
        prs = {}
18✔
791

792
        if not url.query or str(url.query) == '':
18✔
793
            return cfg, prs
18✔
794

795
        prs = parse_qs(url.query)
18✔
796
        if 'EXCLUDED_INDEXES' in prs:
18✔
797
            cfg['EXCLUDED_INDEXES'] = prs['EXCLUDED_INDEXES'][0].split(',')
18✔
798
        if 'INCLUDE_SPELLING' in prs:
18✔
799
            val = prs['INCLUDE_SPELLING'][0]
18✔
800
            cfg['INCLUDE_SPELLING'] = cls.parse_value(val, bool)
18✔
801
        if 'BATCH_SIZE' in prs:
18✔
802
            cfg['BATCH_SIZE'] = cls.parse_value(prs['BATCH_SIZE'][0], int)
18✔
803
        return cfg, prs
18✔
804

805
    @classmethod
18✔
806
    def _parse_elasticsearch_search_params(cls, url, path, secure, params):
18✔
807
        cfg = {}
18✔
808
        split = path.rsplit('/', 1)
18✔
809

810
        if len(split) > 1:
18!
811
            path = '/'.join(split[:-1])
×
812
            index = split[-1]
×
813
        else:
814
            path = ""
18✔
815
            index = split[0]
18✔
816

817
        cfg['URL'] = urlunparse(
18✔
818
            ('https' if secure else 'http', url[1], path, '', '', '')
819
        )
820
        if 'TIMEOUT' in params:
18✔
821
            cfg['TIMEOUT'] = cls.parse_value(params['TIMEOUT'][0], int)
18✔
822
        if 'KWARGS' in params:
18!
823
            cfg['KWARGS'] = params['KWARGS'][0]
×
824
        cfg['INDEX_NAME'] = index
18✔
825
        return cfg
18✔
826

827
    @classmethod
18✔
828
    def _parse_solr_search_params(cls, url, path, params):
18✔
829
        cfg = {}
18✔
830
        cfg['URL'] = urlunparse(('http',) + url[1:2] + (path,) + ('', '', ''))
18✔
831
        if 'TIMEOUT' in params:
18✔
832
            cfg['TIMEOUT'] = cls.parse_value(params['TIMEOUT'][0], int)
18✔
833
        if 'KWARGS' in params:
18!
834
            cfg['KWARGS'] = params['KWARGS'][0]
×
835
        return cfg
18✔
836

837
    @classmethod
18✔
838
    def _parse_whoosh_search_params(cls, params):
18✔
839
        cfg = {}
18✔
840
        if 'STORAGE' in params:
18✔
841
            cfg['STORAGE'] = params['STORAGE'][0]
18✔
842
        if 'POST_LIMIT' in params:
18✔
843
            cfg['POST_LIMIT'] = cls.parse_value(params['POST_LIMIT'][0], int)
18✔
844
        return cfg
18✔
845

846
    @classmethod
18✔
847
    def _parse_xapian_search_params(cls, params):
18✔
848
        cfg = {}
18✔
849
        if 'FLAGS' in params:
18✔
850
            cfg['FLAGS'] = params['FLAGS'][0]
18✔
851
        return cfg
18✔
852

853
    @classmethod
18✔
854
    def search_url_config(cls, url, engine=None):
18✔
855
        """Parse an arbitrary search URL.
856

857
        :param urllib.parse.ParseResult or str url:
858
            Search URL to parse.
859
        :param str or None engine:
860
            If None, the engine is evaluating from the ``url``.
861
        :return: Parsed search URL.
862
        :rtype: dict
863
        """
864
        config = {}
18✔
865
        url = urlparse(url) if not isinstance(url, cls.URL_CLASS) else url
18✔
866

867
        # Remove query strings.
868
        path = unquote_plus(url.path[1:].split('?', 2)[0])
18✔
869

870
        scheme = url.scheme
18✔
871
        secure = False
18✔
872
        # elasticsearch supports secure schemes, similar to http -> https
873
        if scheme in cls.ELASTICSEARCH_FAMILY and scheme.endswith('s'):
18✔
874
            scheme = scheme[:-1]
18✔
875
            secure = True
18✔
876
        if scheme not in cls.SEARCH_SCHEMES:
18!
877
            raise ImproperlyConfigured(f'Invalid search schema {url.scheme}')
×
878
        config['ENGINE'] = cls.SEARCH_SCHEMES[scheme]
18✔
879

880
        # check commons params
881
        cfg, params = cls._parse_common_search_params(url)
18✔
882
        config.update(cfg)
18✔
883

884
        if url.scheme == 'simple':
18✔
885
            return config
18✔
886

887
        # remove trailing slash
888
        if path.endswith('/'):
18!
889
            path = path[:-1]
×
890

891
        if url.scheme == 'solr':
18✔
892
            config.update(cls._parse_solr_search_params(url, path, params))
18✔
893
            return config
18✔
894

895
        if url.scheme in cls.ELASTICSEARCH_FAMILY:
18✔
896
            config.update(cls._parse_elasticsearch_search_params(
18✔
897
                url, path, secure, params))
898
            return config
18✔
899

900
        config['PATH'] = '/' + path
18✔
901

902
        if url.scheme == 'whoosh':
18✔
903
            config.update(cls._parse_whoosh_search_params(params))
18✔
904
        elif url.scheme == 'xapian':
18!
905
            config.update(cls._parse_xapian_search_params(params))
18✔
906

907
        if engine:
18!
908
            config['ENGINE'] = engine
×
909

910
        return config
18✔
911

912
    @classmethod
18✔
913
    def read_env(cls, env_file=None, overwrite=False, parse_comments=False,
18✔
914
                 encoding='utf8', **overrides):
915
        r"""Read a .env file into os.environ.
916

917
        If not given a path to a dotenv path, does filthy magic stack
918
        backtracking to find the dotenv in the same directory as the file that
919
        called ``read_env``.
920

921
        Existing environment variables take precedent and are NOT overwritten
922
        by the file content. ``overwrite=True`` will force an overwrite of
923
        existing environment variables.
924

925
        Refs:
926

927
        * https://wellfire.co/learn/easier-12-factor-django
928

929
        :param env_file: The path to the ``.env`` file your application should
930
            use. If a path is not provided, `read_env` will attempt to import
931
            the Django settings module from the Django project root.
932
        :param overwrite: ``overwrite=True`` will force an overwrite of
933
            existing environment variables.
934
        :param parse_comments: Determines whether to recognize and ignore
935
           inline comments in the .env file. Default is False.
936
        :param encoding: The encoding to use when reading the environment file.
937
        :param \**overrides: Any additional keyword arguments provided directly
938
            to read_env will be added to the environment. If the key matches an
939
            existing environment variable, the value will be overridden.
940
        """
941
        if env_file is None:
18!
942
            # pylint: disable=protected-access
943
            frame = sys._getframe()
×
944
            env_file = os.path.join(
×
945
                os.path.dirname(frame.f_back.f_code.co_filename),
946
                '.env'
947
            )
948
            if not os.path.exists(env_file):
×
949
                logger.info(
×
950
                    "%s doesn't exist - if you're not configuring your "
951
                    "environment separately, create one.", env_file)
952
                return
×
953

954
        try:
18✔
955
            if isinstance(env_file, Openable):
18!
956
                # Python 3.5 support (wrap path with str).
957
                with open(str(env_file), encoding=encoding) as f:
18✔
958
                    content = f.read()
18✔
959
            else:
960
                with env_file as f:
×
961
                    content = f.read()
×
962
        except OSError:
×
963
            logger.info(
×
964
                "%s not found - if you're not configuring your "
965
                "environment separately, check this.", env_file)
966
            return
×
967

968
        logger.debug('Read environment variables from: %s', env_file)
18✔
969

970
        def _keep_escaped_format_characters(match):
18✔
971
            """Keep escaped newline/tabs in quoted strings"""
972
            escaped_char = match.group(1)
18✔
973
            if escaped_char in 'rnt':
18!
974
                return '\\' + escaped_char
18✔
975
            return escaped_char
×
976

977
        for line in content.splitlines():
18✔
978
            m1 = re.match(r'\A(?:export )?([A-Za-z_0-9]+)=(.*)\Z', line)
18✔
979
            if m1:
18✔
980

981
                # Example:
982
                #
983
                # line: KEY_499=abc#def
984
                # key:  KEY_499
985
                # val:  abc#def
986
                key, val = m1.group(1), m1.group(2)
18✔
987

988
                if not parse_comments:
18✔
989
                    # Default behavior
990
                    #
991
                    # Look for value in single quotes
992
                    m2 = re.match(r"\A'(.*)'\Z", val)
18✔
993
                    if m2:
18✔
994
                        val = m2.group(1)
18✔
995
                else:
996
                    # Ignore post-# comments (outside quotes).
997
                    # Something like ['val'  # comment] becomes ['val'].
998
                    m2 = re.match(r"\A\s*'(?<!\\)(.*)'\s*(#.*\s*)?\Z", val)
18✔
999
                    if m2:
18✔
1000
                        val = m2.group(1)
18✔
1001
                    else:
1002
                        # For no quotes, find value, ignore comments
1003
                        # after the first #
1004
                        m2a = re.match(r"\A(.*?)(#.*\s*)?\Z", val)
18✔
1005
                        if m2a:
18!
1006
                            val = m2a.group(1)
18✔
1007

1008
                # Look for value in double quotes
1009
                m3 = re.match(r'\A"(.*)"\Z', val)
18✔
1010
                if m3:
18✔
1011
                    val = re.sub(r'\\(.)', _keep_escaped_format_characters,
18✔
1012
                                 m3.group(1))
1013

1014
                overrides[key] = str(val)
18✔
1015
            elif not line or line.startswith('#'):
18!
1016
                # ignore warnings for empty line-breaks or comments
1017
                pass
18✔
1018
            else:
1019
                logger.warning('Invalid line: %s', line)
×
1020

1021
        def set_environ(envval):
18✔
1022
            """Return lambda to set environ.
1023

1024
             Use setdefault unless overwrite is specified.
1025
             """
1026
            if overwrite:
18✔
1027
                return lambda k, v: envval.update({k: str(v)})
18✔
1028
            return lambda k, v: envval.setdefault(k, str(v))
18✔
1029

1030
        setenv = set_environ(cls.ENVIRON)
18✔
1031

1032
        for key, value in overrides.items():
18✔
1033
            setenv(key, value)
18✔
1034

1035

1036
class FileAwareEnv(Env):
18✔
1037
    """
1038
    First look for environment variables with ``_FILE`` appended. If found,
1039
    their contents will be read from the file system and used instead.
1040

1041
    Use as a drop-in replacement for the standard ``environ.Env``:
1042

1043
    .. code-block:: python
1044

1045
        python env = environ.FileAwareEnv()
1046

1047
    For example, if a ``SECRET_KEY_FILE`` environment variable was set,
1048
    ``env("SECRET_KEY")`` would find the related variable, returning the file
1049
    contents rather than ever looking up a ``SECRET_KEY`` environment variable.
1050
    """
1051
    ENVIRON = FileAwareMapping()
18✔
1052

1053

1054
class Path:
18✔
1055
    """Inspired to Django Two-scoops, handling File Paths in Settings."""
1056

1057
    def path(self, *paths, **kwargs):
18✔
1058
        """Create new Path based on self.root and provided paths.
1059

1060
        :param paths: List of sub paths
1061
        :param kwargs: required=False
1062
        :rtype: Path
1063
        """
1064
        return self.__class__(self.__root__, *paths, **kwargs)
18✔
1065

1066
    def file(self, name, *args, **kwargs):
18✔
1067
        r"""Open a file.
1068

1069
        :param str name: Filename appended to :py:attr:`~root`
1070
        :param \*args: ``*args`` passed to :py:func:`open`
1071
        :param \**kwargs: ``**kwargs`` passed to :py:func:`open`
1072
        :rtype: typing.IO[typing.Any]
1073
        """
1074
        # pylint: disable=unspecified-encoding
1075
        return open(self(name), *args, **kwargs)
×
1076

1077
    @property
18✔
1078
    def root(self):
18✔
1079
        """Current directory for this Path"""
1080
        return self.__root__
18✔
1081

1082
    # pylint: disable=keyword-arg-before-vararg
1083
    def __init__(self, start='', *paths, **kwargs):
18✔
1084

1085
        super().__init__()
18✔
1086

1087
        if kwargs.get('is_file', False):
18✔
1088
            start = os.path.dirname(start)
18✔
1089

1090
        self.__root__ = self._absolute_join(start, *paths, **kwargs)
18✔
1091

1092
    def __call__(self, *paths, **kwargs):
18✔
1093
        """Retrieve the absolute path, with appended paths
1094

1095
        :param paths: List of sub path of self.root
1096
        :param kwargs: required=False
1097
        """
1098
        return self._absolute_join(self.__root__, *paths, **kwargs)
18✔
1099

1100
    def __eq__(self, other):
18✔
1101
        if isinstance(other, Path):
18✔
1102
            return self.__root__ == other.__root__
18✔
1103
        return self.__root__ == other
18✔
1104

1105
    def __ne__(self, other):
18✔
1106
        return not self.__eq__(other)
18✔
1107

1108
    def __add__(self, other):
18✔
1109
        if not isinstance(other, Path):
18!
1110
            return Path(self.__root__, other)
18✔
1111
        return Path(self.__root__, other.__root__)
×
1112

1113
    def __sub__(self, other):
18✔
1114
        if isinstance(other, int):
18✔
1115
            return self.path('../' * other)
18✔
1116
        if isinstance(other, str) and self.__root__.endswith(other):
18✔
1117
            return Path(self.__root__.rstrip(other))
18✔
1118

1119
        raise TypeError(
18✔
1120
            "unsupported operand type(s) for -: '{self}' and '{other}' "
1121
            "unless value of {self} ends with value of {other}".format(
1122
                self=type(self), other=type(other)
1123
            )
1124
        )
1125

1126
    def __invert__(self):
18✔
1127
        return self.path('..')
18✔
1128

1129
    def __contains__(self, item):
18✔
1130
        base_path = self.__root__
18✔
1131
        if len(base_path) > 1:
18✔
1132
            base_path = os.path.join(base_path, '')
18✔
1133
        return item.__root__.startswith(base_path)
18✔
1134

1135
    def __repr__(self):
18✔
1136
        return f'<Path:{self.__root__}>'
18✔
1137

1138
    def __str__(self):
18✔
1139
        return self.__root__
18✔
1140

1141
    def __unicode__(self):
18✔
1142
        return self.__str__()
×
1143

1144
    def __getitem__(self, *args, **kwargs):
18✔
1145
        return self.__str__().__getitem__(*args, **kwargs)
18✔
1146

1147
    def __fspath__(self):
18✔
1148
        return self.__str__()
18✔
1149

1150
    def rfind(self, *args, **kwargs):
18✔
1151
        """Proxy method to :py:func:`str.rfind`"""
1152
        return str(self).rfind(*args, **kwargs)
18✔
1153

1154
    def find(self, *args, **kwargs):
18✔
1155
        """Proxy method to :py:func:`str.find`"""
1156
        return str(self).find(*args, **kwargs)
18✔
1157

1158
    @staticmethod
18✔
1159
    def _absolute_join(base, *paths, **kwargs):
18✔
1160
        absolute_path = os.path.abspath(os.path.join(base, *paths))
18✔
1161
        if kwargs.get('required', False) and not os.path.exists(absolute_path):
18✔
1162
            raise ImproperlyConfigured(
18✔
1163
                f'Create required path: {absolute_path}'
1164
            )
1165
        return absolute_path
18✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc