• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IdentityPython / pyFF / 513

24 Jan 2019 - 20:49 coverage: 82.502%. First build
513

Pull #158

travis-ci

web-flow
utf-8 border in store implementation
Pull Request #158: Py3 compat

135 of 157 new or added lines in 11 files covered. (85.99%)

2744 of 3326 relevant lines covered (82.5%)

2.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.59
/src/pyff/utils.py
1
# coding=utf-8
2

3

4
"""
3×
5

6
This module contains various utilities.
7

8
"""
9
import hashlib
3×
10
import io
3×
11
import tempfile
3×
12
from datetime import timedelta, datetime
3×
13
from email.utils import parsedate
3×
14
from threading import local
3×
15
from time import gmtime, strftime
3×
16
from six.moves.urllib_parse import urlparse, quote_plus
3×
17
from itertools import chain
3×
18
from six import StringIO
3×
19
import yaml
3×
20
import xmlsec
3×
21
import cherrypy
3×
22
import iso8601
3×
23
import os
3×
24
import pkg_resources
3×
25
import re
3×
26
from jinja2 import Environment, PackageLoader
3×
27
from lxml import etree
3×
28
from .constants import config, NS
3×
29
from .logs import get_log
3×
30
from .exceptions import *
3×
31
from .i18n import language
3×
32
import requests
3×
33
from requests_file import FileAdapter
3×
34
from requests_cache import CachedSession
3×
35
import base64
3×
36
import time
3×
37
from markupsafe import Markup
3×
38
import six
3×
39
import traceback
3×
40
from . import __version__
3×
41

42
etree.set_default_parser(etree.XMLParser(resolve_entities=False))
3×
43

44
__author__ = 'leifj'
3×
45

46
log = get_log(__name__)
3×
47

48
sentinel = object()
3×
49
thread_data = local()
3×
50

51

52
def xml_error(error_log, m=None):
3×
53
    def _f(x):
3×
54
        if ":WARNING:" in x:
3×
55
            return False
3×
56
        if m is not None and m not in x:
3×
57
            return False
3×
58
        return True
3×
59

60
    return "\n".join(filter(_f, ["%s" % e for e in error_log]))
3×
61

62

63
def debug_observer(e):
3×
64
    log.error(repr(e))
!
65

66

67
def trunc_str(x, l):
3×
68
    return (x[:l] + '..') if len(x) > l else x
3×
69

70

71
def resource_string(name, pfx=None):
3×
72
    """
73
Attempt to load and return the contents (as a string) of the resource named by
74
the first argument in the first location of:
75

76
# as name in the current directory
77
# as name in the `pfx` subdirectory of the current directory if provided
78
# as name relative to the package
79
# as pfx/name relative to the package
80

81
The last two alternatives is used to locate resources distributed in the package.
82
This includes certain XSLT and XSD files.
83

84
:param name: The string name of a resource
85
:param pfx: An optional prefix to use in searching for name
86

87
    """
88
    name = os.path.expanduser(name)
3×
89
    data = None
3×
90
    if os.path.exists(name):
3×
91
        with io.open(name) as fd:
3×
92
            data = fd.read()
3×
93
    elif pfx and os.path.exists(os.path.join(pfx, name)):
3×
94
        with io.open(os.path.join(pfx, name)) as fd:
3×
95
            data = fd.read()
3×
96
    elif pkg_resources.resource_exists(__name__, name):
3×
97
        data = pkg_resources.resource_string(__name__, name)
3×
98
    elif pfx and pkg_resources.resource_exists(__name__, "%s/%s" % (pfx, name)):
3×
99
        data = pkg_resources.resource_string(__name__, "%s/%s" % (pfx, name))
3×
100

101
    return data
3×
102

103

104
def resource_filename(name, pfx=None):
3×
105
    """
106
Attempt to find and return the filename of the resource named by the first argument
107
in the first location of:
108

109
# as name in the current directory
110
# as name in the `pfx` subdirectory of the current directory if provided
111
# as name relative to the package
112
# as pfx/name relative to the package
113

114
The last two alternatives is used to locate resources distributed in the package.
115
This includes certain XSLT and XSD files.
116

117
:param name: The string name of a resource
118
:param pfx: An optional prefix to use in searching for name
119

120
    """
121
    if os.path.exists(name):
3×
122
        return name
3×
123
    elif pfx and os.path.exists(os.path.join(pfx, name)):
3×
124
        return os.path.join(pfx, name)
3×
125
    elif pkg_resources.resource_exists(__name__, name):
3×
126
        return pkg_resources.resource_filename(__name__, name)
3×
127
    elif pfx and pkg_resources.resource_exists(__name__, "%s/%s" % (pfx, name)):
3×
128
        return pkg_resources.resource_filename(__name__, "%s/%s" % (pfx, name))
3×
129

130
    return None
3×
131

132

133
def totimestamp(dt, epoch=datetime(1970, 1, 1)):
3×
134
    epoch = epoch.replace(tzinfo=dt.tzinfo)
3×
135

136
    td = dt - epoch
3×
137
    ts = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 1e6
3×
138
    return int(ts)
3×
139

140

141
def dumptree(t, pretty_print=False, method='xml', xml_declaration=True):
3×
142
    """
143
Return a string representation of the tree, optionally pretty_print(ed) (default False)
144

145
:param t: An ElemenTree to serialize
146
    """
147
    return etree.tostring(t, encoding='UTF-8', method=method, xml_declaration=xml_declaration,
3×
148
                          pretty_print=pretty_print)
149

150

151
def iso_now():
3×
152
    """
153
Current time in ISO format
154
    """
155
    return iso_fmt()
3×
156

157

158
def iso_fmt(tstamp=None):
3×
159
    """
160
Timestamp in ISO format
161
    """
162
    return strftime("%Y-%m-%dT%H:%M:%SZ", gmtime(tstamp))
3×
163

164

165
def ts_now():
3×
166
    return int(time.time())
3×
167

168

169
def iso2datetime(s):
3×
170
    return iso8601.parse_date(s)
3×
171

172

173
def first_text(elt, tag, default=None):
3×
174
    for matching in elt.iter(tag):
3×
175
        return matching.text
3×
176
    return default
3×
177

178

179
class ResourceResolver(etree.Resolver):
3×
180
    def __init__(self):
3×
181
        super(ResourceResolver, self).__init__()
3×
182

183
    def resolve(self, system_url, public_id, context):
3×
184
        """
185
        Resolves URIs using the resource API
186
        """
187
        # log.debug("resolve SYSTEM URL' %s' for '%s'" % (system_url, public_id))
188
        path = system_url.split("/")
3×
189
        fn = path[len(path) - 1]
3×
190
        if pkg_resources.resource_exists(__name__, fn):
3×
191
            return self.resolve_file(pkg_resources.resource_stream(__name__, fn), context)
!
192
        elif pkg_resources.resource_exists(__name__, "schema/%s" % fn):
3×
193
            return self.resolve_file(pkg_resources.resource_stream(__name__, "schema/%s" % fn), context)
3×
194
        else:
195
            raise ValueError("Unable to locate %s" % fn)
!
196

197

198
def schema():
3×
199
    if not hasattr(thread_data, 'schema'):
3×
200
        thread_data.schema = None
3×
201

202
    if thread_data.schema is None:
3×
203
        try:
3×
204
            parser = etree.XMLParser(collect_ids=False, resolve_entities=False)
3×
205
            parser.resolvers.add(ResourceResolver())
3×
206
            st = etree.parse(pkg_resources.resource_stream(__name__, "schema/schema.xsd"), parser)
3×
207
            thread_data.schema = etree.XMLSchema(st)
3×
208
        except etree.XMLSchemaParseError as ex:
!
209
            log.error(xml_error(ex.error_log))
!
210
            raise ex
!
211
    return thread_data.schema
3×
212

213

214
def check_signature(t, key, only_one_signature=False):
3×
215
    if key is not None:
3×
216
        log.debug("verifying signature using %s" % key)
3×
217
        refs = xmlsec.verified(t, key, drop_signature=True)
3×
218
        if only_one_signature and len(refs) != 1:
3×
219
            raise MetadataException("XML metadata contains %d signatures - exactly 1 is required" % len(refs))
!
220
        t = refs[0]  # prevent wrapping attacks
3×
221

222
    return t
3×
223

224

225
# @cached(hash_key=lambda *args, **kwargs: hash(args[0]))
226
def validate_document(t):
3×
227
    schema().assertValid(t)
3×
228

229

230
def request_vhost(request):
3×
231
    return request.headers.get('X-Forwarded-Host', request.headers.get('Host', request.base))
3×
232

233

234
def request_scheme(request):
3×
235
    return request.headers.get('X-Forwarded-Proto', request.scheme)
3×
236

237

238
def safe_write(fn, data):
3×
239
    """Safely write data to a file with name fn
240
    :param fn: a filename
241
    :param data: some string data to write
242
    :return: True or False depending on the outcome of the write
243
    """
244
    tmpn = None
3×
245
    try:
3×
246
        fn = os.path.expanduser(fn)
3×
247
        dirname, basename = os.path.split(fn)
3×
248
        kwargs = dict(delete=False, prefix=".%s" % basename, dir=dirname)
3×
249
        if six.PY3:
3×
250
            kwargs['encoding'] = "utf-8"
3×
251
            mode = 'w+'
3×
252
        else:
NEW
253
            mode = 'w+b'
!
254

255
        if isinstance(data, six.binary_type):
3×
256
            data = data.decode('utf-8')
3×
257

258
        with tempfile.NamedTemporaryFile(mode, **kwargs) as tmp:
3×
259
            if six.PY2:
3×
NEW
260
                data = data.encode('utf-8')
!
261

262
            log.debug("safe writing {} chrs into {}".format(len(data), fn))
3×
263
            tmp.write(data)
3×
264
            tmpn = tmp.name
3×
265
        if os.path.exists(tmpn) and os.stat(tmpn).st_size > 0:
3×
266
            os.rename(tmpn, fn)
3×
267
            return True
3×
268
    except Exception as ex:
!
NEW
269
        log.debug(traceback.format_exc())
!
270
        log.error(ex)
!
271
    finally:
272
        if tmpn is not None and os.path.exists(tmpn):
3×
273
            try:
!
274
                os.unlink(tmpn)
!
275
            except Exception as ex:
!
276
                log.warn(ex)
!
277
    return False
!
278

279

280
site_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "site")
3×
281
env = Environment(loader=PackageLoader(__package__, 'templates'), extensions=['jinja2.ext.i18n'])
3×
282
getattr(env, 'install_gettext_callables')(language.gettext, language.ngettext, newstyle=True)
3×
283

284

285
def urlencode_filter(s):
3×
286
    if type(s) == 'Markup':
3×
287
        s = s.unescape()
!
288
    s = s.encode('utf8')
3×
289
    s = quote_plus(s)
3×
290
    return Markup(s)
3×
291

292

293
def truncate_filter(s, max_len=10):
3×
294
    if len(s) > max_len:
3×
295
        return s[0:max_len] + "..."
!
296
    else:
297
        return s
3×
298

299

300
def to_yaml_filter(pipeline):
3×
301
    print(pipeline)
3×
302
    out = StringIO()
3×
303
    yaml.dump(pipeline, stream=out)
3×
304
    return out.getvalue()
3×
305

306

307
env.filters['u'] = urlencode_filter
3×
308
env.filters['truncate'] = truncate_filter
3×
309
env.filters['to_yaml'] = to_yaml_filter
3×
310
env.filters['sha1'] = lambda x: hash_id(x, 'sha1', False)
3×
311

312

313
def template(name):
3×
314
    return env.get_template(name)
3×
315

316

317
def render_template(name, **kwargs):
3×
318
    kwargs.setdefault('http', cherrypy.request)
3×
319
    vhost = request_vhost(cherrypy.request)
3×
320
    kwargs.setdefault('vhost', vhost)
3×
321
    kwargs.setdefault('scheme', request_scheme(cherrypy.request))
3×
322
    kwargs.setdefault('brand', "pyFF @ %s" % vhost)
3×
323
    kwargs.setdefault('google_api_key', config.google_api_key)
3×
324
    kwargs.setdefault('_', _)
3×
325
    return template(name).render(**kwargs)
3×
326

327

328
def parse_date(s):
3×
329
    if s is None:
!
330
        return datetime.now()
!
331
    return datetime(*parsedate(s)[:6])
!
332

333

334
def root(t):
3×
335
    if hasattr(t, 'getroot') and hasattr(t.getroot, '__call__'):
3×
336
        return t.getroot()
3×
337
    else:
338
        return t
3×
339

340

341
def with_tree(elt, cb):
3×
342
    cb(elt)
3×
343
    if isinstance(elt.tag, six.string_types):
3×
344
        for child in list(elt):
3×
345
            with_tree(child, cb)
3×
346

347

348
def duration2timedelta(period):
3×
349
    regex = re.compile(
3×
350
        '(?P<sign>[-+]?)P(?:(?P<years>\d+)[Yy])?(?:(?P<months>\d+)[Mm])?(?:(?P<days>\d+)[Dd])?(?:T(?:(?P<hours>\d+)[Hh])?(?:(?P<minutes>\d+)[Mm])?(?:(?P<seconds>\d+)[Ss])?)?')
351

352
    # Fetch the match groups with default value of 0 (not None)
353
    m = regex.match(period)
3×
354
    if not m:
3×
355
        return None
3×
356

357
    duration = m.groupdict(0)
3×
358

359
    # Create the timedelta object from extracted groups
360
    delta = timedelta(days=int(duration['days']) + (int(duration['months']) * 30) + (int(duration['years']) * 365),
3×
361
                      hours=int(duration['hours']),
362
                      minutes=int(duration['minutes']),
363
                      seconds=int(duration['seconds']))
364

365
    if duration['sign'] == "-":
3×
366
        delta *= -1
3×
367

368
    return delta
3×
369

370

371
def filter_lang(elts, langs=None):
3×
372
    if langs is None or type(langs) is not list:
3×
373
        langs = ['en']
3×
374

375
    def _l(elt):
3×
376
        return elt.get("{http://www.w3.org/XML/1998/namespace}lang", "en") in langs
3×
377

378
    if elts is None:
3×
379
        return []
!
380

381
    lst = list(filter(_l, elts))
3×
382
    if lst:
3×
383
        return lst
3×
384
    else:
385
        return elts
3×
386

387

388
def xslt_transform(t, stylesheet, params=None):
3×
389
    if not params:
3×
390
        params = dict()
3×
391

392
    if not hasattr(thread_data, 'xslt'):
3×
393
        thread_data.xslt = dict()
3×
394

395
    transform = None
3×
396
    if stylesheet not in thread_data.xslt:
3×
397
        xsl = etree.fromstring(resource_string(stylesheet, "xslt"))
3×
398
        thread_data.xslt[stylesheet] = etree.XSLT(xsl)
3×
399
    transform = thread_data.xslt[stylesheet]
3×
400
    try:
3×
401
        return transform(t, **params)
3×
402
    except etree.XSLTApplyError as ex:
!
403
        for entry in transform.error_log:
!
404
            log.error('\tmessage from line %s, col %s: %s' % (entry.line, entry.column, entry.message))
!
405
            log.error('\tdomain: %s (%d)' % (entry.domain_name, entry.domain))
!
406
            log.error('\ttype: %s (%d)' % (entry.type_name, entry.type))
!
407
            log.error('\tlevel: %s (%d)' % (entry.level_name, entry.level))
!
408
            log.error('\tfilename: %s' % entry.filename)
!
409
        raise ex
!
410

411

412
def valid_until_ts(elt, default_ts):
3×
413
    ts = default_ts
3×
414
    valid_until = elt.get("validUntil", None)
3×
415
    if valid_until is not None:
3×
416
        dt = iso8601.parse_date(valid_until)
3×
417
        if dt is not None:
3×
418
            ts = totimestamp(dt)
3×
419

420
    cache_duration = elt.get("cacheDuration", None)
3×
421
    if cache_duration is not None:
3×
422
        dt = datetime.utcnow() + duration2timedelta(cache_duration)
3×
423
        if dt is not None:
3×
424
            ts = totimestamp(dt)
3×
425

426
    return ts
3×
427

428

429
def total_seconds(dt):
3×
430
    if hasattr(dt, "total_seconds"):
3×
431
        return dt.total_seconds()
3×
432
    else:
433
        return (dt.microseconds + (dt.seconds + dt.days * 24 * 3600) * 10 ** 6) / 10 ** 6
!
434

435

436
def etag(s):
3×
NEW
437
    return hex_digest(s, hn="sha256")
!
438

439

440
def hash_id(entity, hn='sha1', prefix=True):
3×
441
    entity_id = entity
3×
442
    if hasattr(entity, 'get'):
3×
443
        entity_id = entity.get('entityID')
3×
444

445
    hstr = hex_digest(entity_id, hn)
3×
446
    if prefix:
3×
447
        return "{%s}%s" % (hn, hstr)
3×
448
    else:
449
        return hstr
3×
450

451

452
def hex_digest(data, hn='sha1'):
3×
453
    if hn == 'null':
3×
454
        return data
3×
455

456
    if not hasattr(hashlib, hn):
3×
457
        raise ValueError("Unknown digest '%s'" % hn)
!
458

459
    if not isinstance(data, six.binary_type):
3×
460
        data = data.encode("utf-8")
3×
461

462
    m = getattr(hashlib, hn)()
3×
463
    m.update(data)
3×
464
    return m.hexdigest()
3×
465

466

467
def parse_xml(io, base_url=None):
3×
468
    return etree.parse(io, base_url=base_url, parser=etree.XMLParser(resolve_entities=False,collect_ids=False))
3×
469

470

471
def has_tag(t, tag):
3×
472
    tags = t.iter(tag)
3×
473
    return next(tags, sentinel) is not sentinel
3×
474

475

476
def url2host(url):
3×
477
    (host, sep, port) = urlparse(url).netloc.partition(':')
3×
478
    return host
3×
479

480

481
def subdomains(domain):
3×
482
    dl = []
3×
483
    dsplit = domain.split('.')
3×
484
    if len(dsplit) < 3:
3×
485
        dl.append(domain)
3×
486
    else:
487
        for i in range(1, len(dsplit) - 1):
3×
488
            dl.append(".".join(dsplit[i:]))
3×
489

490
    return dl
3×
491

492

493
def ddist(a, b):
3×
494
    if len(a) > len(b):
!
495
        return ddist(b, a)
!
496

497
    a = a.split('.')
!
498
    b = b.split('.')
!
499

500
    d = [x[0] == x[1] for x in zip(a[::-1], b[::-1])]
!
501
    if False in d:
!
502
        return d.index(False)
!
503
    return len(a)
!
504

505

506
def avg_domain_distance(d1, d2):
3×
507
    dd = 0
!
508
    n = 0
!
509
    for a in d1.split(';'):
!
510
        for b in d2.split(';'):
!
511
            d = ddist(a, b)
!
512
            # log.debug("ddist %s %s -> %d" % (a, b, d))
513
            dd += d
!
514
            n += 1
!
515
    return int(dd / n)
!
516

517

518
def sync_nsmap(nsmap, elt):
3×
519
    fix = []
!
520
    for ns in elt.nsmap:
!
521
        if ns not in nsmap:
!
522
            nsmap[ns] = elt.nsmap[ns]
!
523
        elif nsmap[ns] != elt.nsmap[ns]:
!
524
            fix.append(ns)
!
525
        else:
526
            pass
!
527

528

529
def rreplace(s, old, new, occurrence):
3×
530
    li = s.rsplit(old, occurrence)
!
531
    return new.join(li)
!
532

533

534
def load_callable(name):
3×
535
    from importlib import import_module
3×
536
    p, m = name.rsplit(':', 1)
3×
537
    mod = import_module(p)
3×
538
    return getattr(mod, m)
3×
539

540

541
# semantics copied from https://github.com/lordal/md-summary/blob/master/md-summary
542
# many thanks to Anders Lordahl & Scotty Logan for the idea
543
def guess_entity_software(e):
3×
544
    for elt in chain(e.findall(".//{%s}SingleSignOnService" % NS['md']),
!
545
                     e.findall(".//{%s}AssertionConsumerService" % NS['md'])):
546
        location = elt.get('Location')
!
547
        if location:
!
548
            if 'Shibboleth.sso' in location \
!
549
                    or 'profile/SAML2/POST/SSO' in location \
550
                    or 'profile/SAML2/Redirect/SSO' in location \
551
                    or 'profile/Shibboleth/SSO' in location:
552
                return 'Shibboleth'
!
553
            if location.endswith('saml2/idp/SSOService.php') or 'saml/sp/saml2-acs.php' in location:
!
554
                return 'SimpleSAMLphp'
!
555
            if location.endswith('user/authenticate'):
!
556
                return 'KalturaSSP'
!
557
            if location.endswith('adfs/ls') or location.endswith('adfs/ls/'):
!
558
                return 'ADFS'
!
559
            if '/oala/' in location or 'login.openathens.net' in location:
!
560
                return 'OpenAthens'
!
561
            if '/idp/SSO.saml2' in location or '/sp/ACS.saml2' in location or 'sso.connect.pingidentity.com' in location:
!
562
                return 'PingFederate'
!
563
            if 'idp/saml2/sso' in location:
!
564
                return 'Authentic2'
!
565
            if 'nidp/saml2/sso' in location:
!
566
                return 'Novell Access Manager'
!
567
            if 'affwebservices/public/saml2sso' in location:
!
568
                return 'CASiteMinder'
!
569
            if 'FIM/sps' in location:
!
570
                return 'IBMTivoliFIM'
!
571
            if 'sso/post' in location \
!
572
                    or 'sso/redirect' in location \
573
                    or 'saml2/sp/acs' in location \
574
                    or 'saml2/ls' in location \
575
                    or 'saml2/acs' in location \
576
                    or 'acs/redirect' in location \
577
                    or 'acs/post' in location \
578
                    or 'saml2/sp/ls/' in location:
579
                return 'PySAML'
!
580
            if 'engine.surfconext.nl' in location:
!
581
                return 'SURFConext'
!
582
            if 'opensso' in location:
!
583
                return 'OpenSSO'
!
584
            if 'my.salesforce.com' in location:
!
585
                return 'Salesforce'
!
586

587
    entity_id = e.get('entityID')
!
588
    if '/shibboleth' in entity_id:
!
589
        return 'Shibboleth'
!
590
    if entity_id.endswith('/metadata.php'):
!
591
        return 'SimpleSAMLphp'
!
592
    if '/openathens' in entity_id:
!
593
        return 'OpenAthens'
!
594

595
    return 'other'
!
596

597

598
def is_text(x):
3×
599
    return isinstance(x, six.string_types) or isinstance(x, six.text_type)
3×
600

601
def chunks(l, n):
3×
602
    """Yield successive n-sized chunks from l."""
603
    for i in range(0, len(l), n):
!
604
        yield l[i:i + n]
!
605

606

607
def urls_get(urls):
3×
608
    """
609
    Download multiple URLs and return all the response objects
610
    :param urls:
611
    :return:
612
    """
613
    return [url_get(url) for url in urls]
!
614

615

616
def url_get(url):
3×
617
    """
618
    Download an URL using a cache and return the response object
619
    :param url:
620
    :return:
621
    """
622
    s = None
3×
623
    info = dict()
3×
624

625
    if 'file://' in url:
3×
626
        s = requests.session()
3×
627
        s.mount('file://', FileAdapter())
3×
628
    else:
629
        s = CachedSession(cache_name="pyff_cache",
3×
630
                          backend=config.request_cache_backend,
631
                          expire_after=config.request_cache_time,
632
                          old_data_on_error=True)
633
    headers = {'User-Agent': "pyFF/{}".format(__version__), 'Accept': '*/*'}
3×
634
    try:
3×
635
        r = s.get(url, headers=headers, verify=False, timeout=config.request_timeout)
3×
636
    except Exception as ex:
3×
637
        log.warn(ex)
3×
638
        s = requests.Session()
3×
639
        r = s.get(url, headers=headers, verify=False, timeout=config.request_timeout)
3×
640

641
    if six.PY2:
3×
NEW
642
        r.encoding = "utf-8"
!
643

644
    log.debug("url_get({}) returns {} chrs encoded as {}".format(url, len(r.content), r.encoding))
3×
645

646
    if config.request_override_encoding is not None:
3×
647
        r.encoding = config.request_override_encoding
3×
648

649
    return r
3×
650

651

652
def safe_b64e(data):
3×
653
    if not isinstance(data, six.binary_type):
3×
654
        data = data.encode("utf-8")
3×
655
    return base64.b64encode(data).decode('ascii')
3×
656

657

658
def safe_b64d(s):
3×
659
    return base64.b64decode(s)
3×
660

661

662
def img_to_data(data, mime_type):
3×
663
    """Convert a file (specified by a path) into a data URI."""
664
    data64 = safe_b64e(data)
3×
665
    return 'data:%s;base64,%s' % (mime_type, data64)
3×
666

667

668
def short_id(data):
3×
669
    hasher = hashlib.sha1(data)
!
670
    return base64.urlsafe_b64encode(hasher.digest()[0:10]).rstrip('=')
!
671

672

673
def unicode_stream(data):
3×
674
    return six.BytesIO(data.encode('UTF-8'))
3×
675

676

677
def b2u(data):
3×
678
    if is_text(data):
3×
679
        return data
3×
680
    elif isinstance(data, six.binary_type):
3×
681
        return data.decode("utf-8")
3×
682
    elif isinstance(data, tuple) or isinstance(data, list):
3×
683
        return [b2u(item) for item in data]
3×
684
    elif isinstance(data, set):
3×
685
        return set([b2u(item) for item in data])
3×
686
    return data
3×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2023 Coveralls, Inc