• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #6918

20 Nov 2025 06:07PM UTC coverage: 78.052% (-0.01%) from 78.064%
#6918

push

coveralls-python

web-flow
Merge 16e3cf716 into d60d0cbc9

18 of 21 new or added lines in 4 files covered. (85.71%)

6 existing lines in 1 file now uncovered.

13482 of 17273 relevant lines covered (78.05%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.09
/ckanext-hdx_theme/ckanext/hdx_theme/helpers/helpers.py
1
import json
1✔
2
import datetime
1✔
3
import logging
1✔
4
import re
1✔
5
import six
1✔
6
import six.moves.urllib.parse as urlparse
1✔
7

8
import ckan.authz as new_authz
1✔
9
import ckan.lib.base as base
1✔
10
import ckan.lib.helpers as h
1✔
11
import ckan.logic as logic
1✔
12
import ckan.model as model
1✔
13
import ckan.plugins.toolkit as tk
1✔
14
import ckanext.activity.model.activity as activity_model
1✔
15
import ckanext.requestdata.model as requestdata_model
1✔
16
import ckanext.hdx_theme.version as version
1✔
17
from urllib.parse import urlencode, quote_plus
1✔
18

19
from six import text_type
1✔
20
from typing import Any, Optional, Union
1✔
21

22
from collections import OrderedDict
1✔
23
from ckan.lib import munge
1✔
24
from ckan.plugins import toolkit
1✔
25
from ckanext.hdx_package.helpers.caching import cached_objects_with_notifications, cached_objects_without_notifications
1✔
26
from ckanext.hdx_package.helpers.freshness_calculator import UPDATE_FREQ_INFO, UPDATE_FREQ_NEVER
1✔
27
from ckanext.hdx_package.helpers.p_code_filters_helper import are_new_p_code_filters_enabled
1✔
28
from ckanext.hdx_theme.util.light_redirect import switch_url_path
1✔
29
from ckanext.hdx_users.notifications_subscription_model import ObjectType
1✔
30

31
_ = toolkit._
1✔
32
request = toolkit.request
1✔
33
c = toolkit.c
1✔
34
config = toolkit.config
1✔
35
ungettext = toolkit.ungettext
1✔
36

37
log = logging.getLogger(__name__)
1✔
38

39
downloadable_formats = {
1✔
40
    'csv', 'xls', 'xlsx', 'txt', 'jpg', 'jpeg', 'png', 'gif', 'zip', 'xml'
41
}
42

43

44
def is_downloadable(resource):
1✔
45
    format = resource.get('format', 'data').lower()
1✔
46
    if format in downloadable_formats:
1✔
47
        return True
1✔
48
    return False
1✔
49

50

51
def filter_search_seo_category_keys(all_categ_keys):
1✔
52
    """
53
    Filter category keys for SEO purposes by excluding organization and location filters.
54
    Returns filtered keys if 2 or fewer, empty array if more than 2.
55
    """
56
    if not all_categ_keys:
1✔
57
        return []
1✔
58

59
    seo_excluded_categories = ['groups', 'organization']
1✔
60
    filtered_categ_keys = [key for key in all_categ_keys if key not in seo_excluded_categories]
1✔
61

62
    return filtered_categ_keys if len(filtered_categ_keys) <= 2 else []
1✔
63

64

65
def generate_canonical_link(request_path, request_args, canonical_url=None, params=None, no_params=False):
1✔
66
    """
67
    Generate canonical link information for SEO purposes.
68

69
    Args:
70
        request_path: current request path
71
        request_args: current request args
72
        canonical_url: set a specific canonical url (not automatic); checks current path (w/o params) versus canonical_url -> useful for pages that are available via /page/{item} where the item should be name, but can be id
73
        params: array of whitelisted params to remove, but keep others
74
        no_params: if True then we'll remove all params
75
    """
76
    if not canonical_url:
1✔
77
        canonical_url = request_path
1✔
78

79
    # canonical urls are the desktop pages
80
    if canonical_url.startswith('/m/'):
1✔
81
        canonical_url = canonical_url.replace('/m/', '/', 1)
1✔
82

83
    if no_params and request_path != canonical_url:
1✔
84
        # where current URL base differs and it's not about the params
85
        return {'add_canonical': True, 'canonical_url': canonical_url}
1✔
86

87
    if no_params:
1✔
88
        # if we have any params we need to set canonical
89
        add_canonical = len(request_args) > 0 if request_args else False
1✔
90
        return {'add_canonical': add_canonical, 'canonical_url': canonical_url}
1✔
91

92
    extra_args = dict(request_args) if request_args else {}
1✔
93
    # remove whitelisted params from extra_args
94
    if params:
1✔
95
        for param in params:
1✔
96
            extra_args.pop(param, None)
1✔
97

98
    # if we have any params beyond the whitelisted ones then we need to set canonical
99
    # or if we have a different path (on mobile)
100
    add_canonical = len(extra_args) > 0 or request_path != canonical_url
1✔
101

102
    if add_canonical and params and request_args:
1✔
103
        query_parts = []
1✔
104
        for param in params:
1✔
105
            values = request_args.getlist(param)
1✔
106
            for value in values:
1✔
107
                if value is not None:
1✔
108
                    query_parts.append(f"{param}={quote_plus(str(value))}")
1✔
109

110
        if query_parts:
1✔
111
            canonical_url = f"{canonical_url}?{'&'.join(query_parts)}"
1✔
112

113
    return {'add_canonical': add_canonical, 'canonical_url': canonical_url}
1✔
114

115

116
def is_not_zipped(res):
1✔
117
    url = res.get('url', 'zip').strip().lower()  # Default to zip so preview doesn't show if there is no url
1✔
118
    if re.search(r'zip$', url) or re.search(r'rar$', url):
1✔
119
        return False
×
120
    return True
1✔
121

122

123
NOT_HXL_FORMAT_LIST = frozenset({'zipped shapefile', 'zip', 'geojson', 'json', 'kml', 'kmz', 'rar', 'pdf', 'excel',
1✔
124
                       'zipped', 'docx', 'doc', '7z'})
125

126

127
def _any(item, ext_list=NOT_HXL_FORMAT_LIST):
1✔
128
    return any(i in item for i in ext_list)
×
129

130

131
def is_not_hxl_format(res_format):
1✔
132
    if not res_format:
×
133
        return False
×
134
    return _any([res_format.lower()], NOT_HXL_FORMAT_LIST)
×
135

136

137
def get_facet_items_dict(facet, limit=1000, exclude_active=False):
1✔
138
    facets = h.get_facet_items_dict(
×
139
        facet, limit=limit, exclude_active=exclude_active)
140
    filtered_no_items = c.search_facets.get(facet)['items'].__len__()
×
141
    # total_no_items = json.loads(
142
    #     count.CountController.list[facet](count.CountController()))['count']
143
    # if filtered_no_items <= 50 and filtered_no_items < total_no_items:
144
    #     no_items = filtered_no_items
145
    # else:
146
    #     no_items = total_no_items
147
    no_items = filtered_no_items
×
148

149
    if c.search_facets_limits:
×
150
        limit = c.search_facets_limits.get(facet)
×
151
    if limit:
×
152
        return (facets[:limit], no_items)
×
153
    else:
154
        return (facets, no_items)
×
155

156

157
def get_last_modifier_user(dataset_id=None, group_id=None):
1✔
158
    activity_objects = None
×
159
    if group_id is not None:
×
160
        activity_objects = activity_model.group_activity_list(group_id, limit=1, offset=0)
×
161
    if dataset_id:
×
162
        activity_objects = activity_model.package_activity_list(dataset_id, limit=1, offset=0)
×
163
    if activity_objects:
×
164
        user = activity_objects[0].user_id
×
165
        t_stamp = activity_objects[0].timestamp
×
166
        user_obj = model.User.get(user)
×
167
        return {"user_obj": user_obj, "last_modified": t_stamp.isoformat()}
×
168

169
    # in case there is no update date it will be displayed the current date
170
    return {"user_obj": None, "last_modified": None}
×
171

172

173
def get_filtered_params_list(params):
1✔
174
    result = []
×
175
    for (key, value) in params.items():
×
176
        if key not in {'q', 'sort'}:
×
177
            result.append((key, value))
×
178
    return result
×
179

180

181
# def get_last_revision_package(package_id):
182
#     #     pkg_list  = model.Session.query(model.Package).filter(model.Package.id == package_id).all()
183
#     #     pkg = pkg_list[0]
184
#     #     return pkg.latest_related_revision.id
185
#     activity_objects = model.activity.package_activity_list(
186
#         package_id, limit=1, offset=0)
187
#     if len(activity_objects) > 0:
188
#         activity = activity_objects[0]
189
#         return activity.revision_id
190
#     return None
191

192

193
# def get_last_revision_group(group_id):
194
#     #     grp_list  = model.Session.query(model.Group).filter(model.Group.id == group_id).all()
195
#     #     grp = grp_list[0]
196
#     #     last_rev = grp.all_related_revisions[0][0]
197
#     activity_objects = model.activity.group_activity_list(
198
#         group_id, limit=1, offset=0)
199
#     if len(activity_objects) > 0:
200
#         activity = activity_objects[0]
201
#         return activity.revision_id
202
#     return None
203

204

205
def get_last_revision_timestamp_group(group_id):
1✔
206
    activity_objects = activity_model.group_activity_list(
×
207
        group_id, limit=1, offset=0)
208
    if len(activity_objects) > 0:
×
209
        activity = activity_objects[0]
×
210
        return h.render_datetime(activity.timestamp)
×
211
    return None
×
212

213

214
def get_dataset_date_format(date):
1✔
215
    # Is this a range?
216
    drange = date.split('-')
×
217
    if len(drange) != 2:
×
218
        drange = [date]
×
219
    dates = []
×
220
    for d in drange:
×
221
        try:
×
222
            dt = datetime.datetime.strptime(d, '%m/%d/%Y')
×
223
            dates.append(dt.strftime('%b %-d, %Y'))
×
224
        except:
×
225
            return False
×
226
    return '-'.join(dates)
×
227

228

229
def get_group_followers(grp_id):
1✔
230
    result = logic.get_action('group_follower_count')(
1✔
231
        {'model': model, 'session': model.Session},
232
        {'id': grp_id})
233
    return result
1✔
234

235

236
def hdx_dataset_follower_count(pkg_id):
1✔
237
    result = logic.get_action('dataset_follower_count')(
×
238
        {'model': model, 'session': model.Session},
239
        {'id': pkg_id})
240
    return result
×
241

242

243
def get_group_members(grp_id):
1✔
244
    try:
1✔
245
        member_list = logic.get_action('member_list')(
1✔
246
            {'model': model, 'session': model.Session},
247
            {'id': grp_id, 'object_type': 'user'})
248
    except logic.NotAuthorized:
×
249
        member_list = logic.get_action('member_list')(
×
250
            {'model': model, 'session': model.Session},
251
            {'id': grp_id, 'include_users': False})
252
    return len(member_list)
1✔
253

254

255
def hdx_get_user_info(user_id):
1✔
256
    context = {'model': model, 'session': model.Session,
1✔
257
               'user': c.user or c.author}
258
    try:
1✔
259
        user = tk.get_action('hdx_basic_user_info')(context, {'id': user_id})
1✔
260
    except logic.NotAuthorized:
×
261
        base.abort(403, _('Unauthorized to see organization member list'))
×
262
    return user
1✔
263

264

265
def hdx_get_org_member_info(user_id, org_name=None):
1✔
266
    context = {'model': model, 'session': model.Session,
1✔
267
               'user': c.user or c.author}
268

269
    if org_name:
1✔
270
        org_list = [{'name': org_name}]
1✔
271
    else:
272
        org_list = tk.get_action('hdx_organization_list_for_user')(context, {'id': user_id})
×
273

274
    user = tk.get_action('hdx_basic_user_info')(context, {'id': user_id})
1✔
275
    orgs_data = []
1✔
276

277
    for org in org_list:
1✔
278
        try:
1✔
279
            maint_pkgs = _get_packages_for_maintainer(context, user_id, org['name'])
1✔
280

281
            if maint_pkgs:
1✔
282
                org['pkgs'] = maint_pkgs
×
283
                orgs_data.append(org)
×
284
                user['maint_orgs_pkgs'] = orgs_data
×
285
        except logic.NotAuthorized:
×
286
            base.abort(403, _('Unauthorized to see organization member list'))
×
287
    return user
1✔
288

289

290
# HDX-8554 - org has to be organization name
291
def _get_packages_for_maintainer(context, id, org_name):
1✔
292
    result = logic.get_action('package_search')(context, {
1✔
293
        'q': '*:*',
294
        'fq': 'maintainer:{0}, organization:{1}'.format(id, org_name),
295
        'rows': 100,
296
    })
297
    return result['results']
1✔
298

299

300
def markdown_extract_strip(text, extract_length=190):
1✔
301
    ''' return the plain text representation of markdown encoded text.  That
302
    is the texted without any html tags.  If extract_length is 0 then it
303
    will not be truncated.'''
304
    result_text = h.markdown_extract(text, extract_length)
1✔
305
    result = result_text.rstrip('\n').replace(
1✔
306
        '\n', ' ').replace('\r', '').replace('"', "&quot;")
307
    return result
1✔
308

309

310
def render_markdown_strip(text, extract_length=190):
1✔
311
    ''' return the plain text representation of markdown encoded text.  That
312
    is the texted without any html tags.  If extract_length is 0 then it
313
    will not be truncated.'''
314
    result_text = h.render_markdown(text, extract_length)
×
315
    result = result_text.rstrip('\n').replace(
×
316
        '\n', ' ').replace('\r', '').replace('"', "&quot;")
317
    return result
×
318

319

320
def methodology_bk_compat(meth, other, render=True):
1✔
321
    if not meth and not other:
1✔
322
        return (None, None)
×
323
    standard_meths = ["Census", "Sample Survey",
1✔
324
                      "Direct Observational Data/Anecdotal Data", "Registry", "Other"]
325
    if meth in standard_meths and meth != "Other":
1✔
326
        if render:
×
327
            return (meth, None)
×
328
        else:
329
            return (meth, None)
×
330
    elif other:
1✔
331
        if render:
×
332
            return ("Other", h.render_markdown(other))
×
333
        else:
334
            return ("Other", other)
×
335
    else:
336
        meth = meth.split('Other - ')
1✔
337
        if render:
1✔
338
            return ("Other", h.render_markdown(meth[0]))
1✔
339
        else:
340
            return ("Other", meth[0])
×
341

342

343
def _hdx_strftime(_date):
1✔
344
    result = None
1✔
345
    try:
1✔
346
        result = _date.strftime('%d %B %Y')
1✔
347
    except ValueError as e:
×
348
        month = datetime.date(1900, _date.month, 1).strftime('%B')
×
NEW
349
        result = str(_date.day) + " " + month + " " +  str(_date.year)
×
350
    return result
1✔
351

352

353
def render_date_from_concat_str(_str, separator='-'):
1✔
354
    result = ''
1✔
355
    if _str:
1✔
356
        if 'TO' in _str:
1✔
357
            res_list = []
1✔
358
            dates_list = str(_str).replace('[', '').replace(']', '').replace(' ', '').split('TO')
1✔
359
            for date in dates_list:
1✔
360
                if '*' not in date:
1✔
361
                    _date = datetime.datetime.strptime(date.split('T')[0], "%Y-%m-%d")
1✔
362
                    res_list.append(_hdx_strftime(_date))
1✔
363
                    # _date.strftime('%B %d, %Y'))
364
                else:
NEW
365
                    res_list.append(datetime.datetime.today().strftime('%d %B %Y'))
×
366
            result = ' - '.join(res_list)
1✔
367
        else:
368
            strdate_list = _str.split(separator)
×
369
            for index, strdate in enumerate(strdate_list):
×
370
                try:
×
371
                    date = datetime.datetime.strptime(strdate.strip(), '%m/%d/%Y')
×
NEW
372
                    render_strdate = date.strftime('%d %B %Y')
×
373
                    result += render_strdate
×
374
                    if index < len(strdate_list) - 1:
×
375
                        result += ' - '
×
376
                except ValueError as e:
×
377
                    log.warning(e)
×
378

379
    return result
1✔
380

381

382
def hdx_build_nav_icon_with_message(menu_item, title, **kw):
1✔
383
    htmlResult = h.build_nav_icon(menu_item, title, **kw)
×
384
    if 'message' not in kw or not kw['message']:
×
385
        return htmlResult
×
386
    else:
387
        newResult = str(htmlResult).replace('</a>',
×
388
                                            ' <span class="nav-short-message">{message}</span></a>'.format(
389
                                                message=kw['message']))
390
        return h.literal(newResult)
×
391

392

393
def hdx_build_nav_no_icon(menu_item, title, **kw):
1✔
394
    html_result = str(h.build_nav_icon(menu_item, title, **kw))
×
395
    # print html_result
396
    start = html_result.find('<i ') - 1
×
397
    end = html_result.find('</i>') + 4
×
398
    if start > 0:
×
399
        new_result = html_result[0:start] + ' class="no-icon">' + html_result[end:]
×
400
    else:
401
        new_result = html_result
×
402
    # print new_result
403
    return h.literal(new_result)
×
404

405

406
def hdx_linked_user(user, maxlength=0):
1✔
407
    response = h.linked_user(user, maxlength)
1✔
408
    changed_response = re.sub(r"<img[^>]+/>", "", response)
1✔
409
    return h.literal(changed_response)
1✔
410

411

412
def hdx_linked_username(user, userobj, maxlength=0, avatar=20):
1✔
413
    if not isinstance(user, model.User):
1✔
414
        user_name = text_type(user)
1✔
415
        user = model.User.get(user_name)
1✔
416
        if not user:
1✔
417
            return user_name
×
418
    if user:
1✔
419
        name = user.name if model.User.VALID_NAME.match(user.name) else user.id
1✔
420
        display_name = user.display_name if userobj else user.name
1✔
421

422
        if maxlength and len(display_name) > maxlength:
1✔
423
            display_name = display_name[:maxlength] + '...'
×
424

425
        if userobj:
1✔
426
            link = h.link_to(display_name, url_for('user.read', id=name))
1✔
427
        else:
428
            link = '''<a onclick="showOnboardingWidget('#loginPopup');" href="#" aria-label="login">%s</a>''' % display_name
1✔
429

430
        return h.literal(u'{icon} {link}'.format(
1✔
431
            icon=h.user_image(
432
                user.id,
433
                size=avatar
434
            ),
435
            link=link
436
        ))
437

438

439
def hdx_show_singular_plural(num, singular_word, plural_word, show_number=True):
1✔
440
    response = None
1✔
441
    if num == 1:
1✔
442
        response = singular_word
1✔
443
    else:
444
        response = plural_word
1✔
445

446
    if show_number:
1✔
447
        return str(num) + ' ' + response
1✔
448
    else:
449
        return response
×
450

451

452
def hdx_num_of_new_related_items():
1✔
453
    max_days = 30
×
454
    count = 0
×
455
    now = datetime.datetime.now()
×
456
    for related in c.pkg.related:
×
457
        if (related.created):
×
458
            difference = now - related.created
×
459
            days = difference.days
×
460
            if days < max_days:
×
461
                count += 1
×
462
    return count
×
463

464

465
def hdx_user_count(only_sysadmin=False, include_site_user=False):
1✔
466
    site_id = config.get('ckan.site_id')
1✔
467
    q = model.Session.query(model.User).filter(model.User.state != 'deleted')
1✔
468
    if only_sysadmin:
1✔
469
        q = q.filter(model.User.sysadmin.is_(True))
1✔
470
    if not include_site_user:
1✔
471
        q = q.filter(model.User.name != site_id)
1✔
472
    return q.count()
1✔
473

474

475
def hdx_member_roles_list():
1✔
476
    context = {'model': model, 'session': model.Session,
1✔
477
               'user': c.user or c.author}
478
    return tk.get_action('member_roles_list')(context, {})
1✔
479

480

481
def hdx_version():
1✔
482
    return version.hdx_version
1✔
483

484

485
def hdx_get_extras_element(data_dict, key='key', value_key='org_url', ret_key='value'):
1✔
486
    res = ''
1✔
487

488
    if value_key in data_dict:
1✔
489
        res = data_dict[value_key]
1✔
490
    else:
491
        extras = data_dict.get('extras', [])
1✔
492
        for ex in extras:
1✔
493
            if ex[key] == value_key:
×
494
                res = ex[ret_key]
×
495
    return res
1✔
496

497

498
def load_json(obj, **kw):
1✔
499
    return json.loads(obj, **kw)
1✔
500

501

502
def escaped_dump_json(obj, **kw):
1✔
503
    # escapes </ to prevent script tag hacking.
504
    return json.dumps(obj, **kw).replace('</', '<\\/')
×
505

506

507
def hdx_group_followee_list():
1✔
508
    context = {'model': model, 'session': model.Session,
1✔
509
               'user': c.user or c.author, 'auth_user_obj': c.userobj,
510
               'for_view': True}
511

512
    list = logic.get_action('group_followee_list')(
1✔
513
        context, {'id': c.userobj.id})
514
    # filter out the orgs
515
    groups = [group for group in list if not group['is_organization']]
1✔
516

517
    return groups
1✔
518

519

520
def hdx_organizations_available_with_roles():
1✔
521
    """
522
    Gets roles of organizations the current user belongs to
523
    """
524
    import ckanext.hdx_org_group.helpers.organization_helper as hdx_helper
1✔
525
    organizations_available = h.organizations_available('read', include_dataset_count=True)
1✔
526
    # if organizations_available and len(organizations_available) > 0:
527
    orgs_where_editor = []
1✔
528
    orgs_where_admin = []
1✔
529
    am_sysadmin = new_authz.is_sysadmin(c.user)
1✔
530
    if not am_sysadmin:
1✔
531
        orgs_where_editor = set([org['id'] for org in h.organizations_available('create_dataset')])
1✔
532
        orgs_where_admin = set([org['id'] for org in h.organizations_available('admin')])
1✔
533

534
    for org in organizations_available:
1✔
535
        org['has_add_dataset_rights'] = True
1✔
536
        if am_sysadmin:
1✔
537
            org['role'] = 'sysadmin'
1✔
538
        elif org['id'] in orgs_where_admin:
×
539
            org['role'] = 'admin'
×
540
        elif org['id'] in orgs_where_editor:
×
541
            org['role'] = 'editor'
×
542
        else:
543
            org['role'] = 'member'
×
544
            org['has_add_dataset_rights'] = False
×
545

546
    organizations_available.sort(key=lambda y: y['display_name'].lower())
1✔
547
    hdx_helper.org_add_last_updated_field(organizations_available)
1✔
548
    return organizations_available
1✔
549

550

551
def hdx_remove_schema_and_domain_from_url(url):
1✔
552
    urlTuple = urlparse.urlparse(url)
1✔
553
    if url.endswith('/preview'):
1✔
554
        # this is the case when the file needs to be transformed
555
        # before it can be previewed
556

557
        modifiedTuple = (('', '') + urlTuple[2:6])
×
558
    else:
559
        # this is for txt files
560
        # we force https since otherwise the browser will
561
        # anyway block loading mixed active content
562
        modifiedTuple = (('',) + urlTuple[1:6])
1✔
563
    modifiedUrl = urlparse.urlunparse(modifiedTuple)
1✔
564
    return modifiedUrl
1✔
565

566

567
def hdx_get_ckan_config(config_name):
1✔
568
    return config.get(config_name)
1✔
569

570

571
def get_group_name_from_list(glist, gid):
1✔
572
    for group in glist:
×
573
        if group['id'] == gid:
×
574
            return group['title']
×
575
    return ""
×
576

577

578
def hdx_follow_link(obj_type, obj_id, extra_text, cls=None, confirm_text=None):
1✔
579
    obj_type = obj_type.lower()
×
580
    assert obj_type in h._follow_objects
×
581
    # If the user is logged in show the follow/unfollow button
582
    if c.user:
×
583
        context = {'model': model, 'session': model.Session, 'user': c.user}
×
584
        action = 'am_following_%s' % obj_type
×
585
        following = logic.get_action(action)(context, {'id': obj_id})
×
586
        return h.snippet('search/snippets/follow_link.html',
×
587
                         following=following,
588
                         obj_id=obj_id,
589
                         obj_type=obj_type,
590
                         extra_text=extra_text,
591
                         confirm_text=confirm_text,
592
                         cls=cls)
593
    return ''
×
594

595

596
def follow_status(obj_type, obj_id):
1✔
597
    obj_type = obj_type.lower()
1✔
598
    assert obj_type in h._follow_objects
1✔
599
    # If the user is logged in show the follow/unfollow button
600
    if c.user:
1✔
601
        context = {'model': model, 'session': model.Session, 'user': c.user}
1✔
602
        action = 'am_following_%s' % obj_type
1✔
603
        following = logic.get_action(action)(context, {'id': obj_id})
1✔
604
        return following
1✔
605
    return False
1✔
606

607

608
def one_active_item(items):
1✔
609
    for i in items:
×
610
        if i['active']:
×
611
            return True
×
612
    return False
×
613

614

615
def feature_count(features):
1✔
616
    count = 0
×
617
    for name in features:
×
618
        facet = h.get_facet_items_dict(name)
×
619
        for f in facet:
×
620
            count += f['count']
×
621
    return count
×
622

623

624
def hdx_follow_button(obj_type, obj_id, **kw):
1✔
625
    ''' This is a modified version of the ckan core follow_button() helper
626
    It returns a simple link for a bootstrap dropdown menu
627

628
    Return a follow button for the given object type and id.
629

630
    If the user is not logged in return an empty string instead.
631

632
    :param obj_type: the type of the object to be followed when the follow
633
        button is clicked, e.g. 'user' or 'dataset'
634
    :type obj_type: string
635
    :param obj_id: the id of the object to be followed when the follow button
636
        is clicked
637
    :type obj_id: string
638

639
    :returns: a follow button as an HTML snippet
640
    :rtype: string
641

642
    '''
643
    obj_type = obj_type.lower()
×
644
    assert obj_type in h._follow_objects
×
645
    # If the user is logged in show the follow/unfollow button
646
    if c.user:
×
647
        context = {'model': model, 'session': model.Session, 'user': c.user}
×
648
        action = 'am_following_%s' % obj_type
×
649
        following = logic.get_action(action)(context, {'id': obj_id})
×
650
        follow_extra_text = _('This Data')
×
651
        if kw and 'follow_extra_text' in kw:
×
652
            follow_extra_text = kw.pop('follow_extra_text')
×
653
        return h.snippet('snippets/hdx_follow_button.html',
×
654
                         following=following,
655
                         obj_id=obj_id,
656
                         obj_type=obj_type,
657
                         follow_extra_text=follow_extra_text,
658
                         params=kw)
659
    return ''
×
660

661

662
def hdx_add_url_param(alternative_url=None, controller=None, action=None,
1✔
663
                      extras=None, new_params=None, unwanted_keys=[]):
664
    '''
665
    MODIFIED CKAN HELPER THAT ALLOWS REMOVING SOME PARAMS
666

667
    Adds extra parameters to existing ones
668

669
    controller action & extras (dict) are used to create the base url
670
    via url_for(controller=controller, action=action, **extras)
671
    controller & action default to the current ones
672

673
    This can be overriden providing an alternative_url, which will be used
674
    instead.
675
    '''
676

677
    params_nopage = [(k, v) for k, v in request.args.items()
×
678
                     if k != 'page' and k not in unwanted_keys]
679
    params = set(params_nopage)
×
680
    if new_params:
×
681
        params |= set(new_params.items())
×
682
    if alternative_url:
×
683
        return h._url_with_params(alternative_url, params)
×
684
    return h._create_url_with_params(params=params, controller=controller,
×
685
                                     action=action, extras=extras)
686

687

688
def https_load(url):
1✔
689
    return re.sub(r'^http://', '//', url)
×
690

691

692
def count_public_datasets_for_group(datasets_list):
1✔
693
    a = len([i for i in datasets_list if i['private'] == False])
×
694
    return a
×
695

696

697
def check_all_str_fields_not_empty(dictionary, warning_template, skipped_keys=[], errors=None):
1✔
698
    for key, value in dictionary.items():
1✔
699
        if key not in skipped_keys:
1✔
700
            value = value.strip() if value else value
1✔
701
            if not value:
1✔
702
                message = warning_template.format(key)
×
703
                log.warning(message)
×
704
                add_error('Empty field', message, errors)
×
705
                return False
×
706
    return True
1✔
707

708

709
def add_error(type, message, errors):
1✔
710
    if isinstance(errors, list):
×
711
        errors.append({'_type': type, 'message': message})
×
712

713

714
def hdx_popular(type_, number, min=1, title=None):
1✔
715
    ''' display a popular icon. '''
716
    from ckan.lib.helpers import snippet as snippet
×
717
    if type_ == 'views':
×
718
        title = ungettext('{number} view', '{number} views', number)
×
719
    elif type_ == 'recent views':
×
720
        title = ungettext('{number} recent view', '{number} recent views', number)
×
721
    elif type_ == 'downloads':
×
722
        title = ungettext('{number} download', '{number} downloads', number)
×
723
    elif not title:
×
724
        raise Exception('popular() did not recieve a valid type_ or title')
×
725
    return snippet('snippets/popular.html', title=title, number=number, min=min)
×
726

727

728
def hdx_license_list():
1✔
729
    license_touple_list = model.Package.get_license_options()
1✔
730
    license_dict_list = [{'value': _id, 'text': _title} for _title, _id in license_touple_list]
1✔
731
    return license_dict_list
1✔
732

733

734
def hdx_methodology_list():
1✔
735
    result = [{'value': '-1', 'text': '-- Please select --'}, {'value': 'Census', 'text': 'Census'},
1✔
736
              {'value': 'Sample Survey', 'text': 'Sample Survey'},
737
              {'value': 'Direct Observational Data/Anecdotal Data', 'text': 'Direct Observational Data/Anecdotal Data'},
738
              {'value': 'Registry', 'text': 'Registry'}, {'value': 'Other', 'text': 'Other'}]
739
    return result
1✔
740

741

742
def hdx_location_list(include_world=True):
1✔
743
    top_values = []
1✔
744
    if include_world:
1✔
745
        top_values.append('world')
1✔
746

747
    top_locations = []
1✔
748
    bottom_locations = []
1✔
749

750
    locations = logic.get_action('cached_group_list')({}, {})
1✔
751
    for loc in locations:
1✔
752
        location_data = {'value': loc.get('name'), 'text': loc.get('title')}
1✔
753
        if loc.get('name') in top_values:
1✔
754
            top_locations.append(location_data)
1✔
755
        else:
756
            bottom_locations.append(location_data)
1✔
757

758
    return top_locations + bottom_locations
1✔
759

760

761
def hdx_location_dict(include_world=True):
1✔
762
    world_location_name = 'world'
1✔
763
    top_values = [world_location_name] if include_world else []
1✔
764

765
    locations = logic.get_action('cached_group_list')({}, {})
1✔
766

767
    top_locations = OrderedDict()
1✔
768
    bottom_locations = OrderedDict()
1✔
769

770
    for loc in locations:
1✔
771
        key = loc.get('title')
1✔
772
        value = loc.get('title')
1✔
773
        if loc.get('name') in top_values:
1✔
774
            top_locations[key] = value
×
775
        else:
776
            if loc.get('name') == world_location_name and include_world is False:
1✔
777
                continue
×
778
            bottom_locations[key] = value
1✔
779

780
    return OrderedDict(list(top_locations.items()) + list(bottom_locations.items()))
1✔
781

782

783
def hdx_user_orgs_dict(user_id, include_org_type=False):
1✔
784
    try:
1✔
785
        orgs = _get_action('organization_list_for_user', {'id': user_id})
1✔
786

787
        if include_org_type:
1✔
788
            query = model.Session.query(model.GroupExtra).filter_by(key='hdx_org_type', state='active')
1✔
789
            org_extras = query.all()
1✔
790

791
            extras = {org_extra.group_id: org_extra.value for org_extra in org_extras}
1✔
792

793
            for org in orgs:
1✔
794
                org_id = org.get('id')
×
795
                if org_id in extras:
×
796
                    org['org_type'] = extras[org_id]
×
797

798
        result = OrderedDict()
1✔
799
        for org in orgs:
1✔
800
            org_data = {'name': org.get('display_name')}
×
801
            if include_org_type and 'org_type' in org:
×
802
                org_data['org_type'] = org['org_type']
×
803
            result[org.get('display_name')] = org_data
×
804

805
        return result
1✔
806

807
    except Exception:
×
808
        return OrderedDict()
×
809

810

811
def hdx_organisation_list():
1✔
812
    orgs = h.organizations_available('create_dataset')
1✔
813
    orgs_dict_list = [{'value': org.get('name'), 'text': org.get('title')} for org in orgs]
1✔
814
    return orgs_dict_list
1✔
815

816

817
def hdx_tag_list():
1✔
818
    if c.user:
×
819
        context = {'model': model, 'session': model.Session, 'user': c.user}
×
820
        tags = logic.get_action('tag_list')(context, {})
×
821
        tags_dict_list = [{'value': tag, 'text': tag} for tag in tags]
×
822
        return tags_dict_list
×
823
    return []
×
824

825

826
def hdx_dataset_preview_values_list():
1✔
827
    import ckanext.hdx_package.helpers.custom_validator as vd
1✔
828
    result = [{'value': vd._DATASET_PREVIEW_FIRST_RESOURCE, 'text': 'Default (first resource with preview)'}]
1✔
829
    return result
1✔
830

831

832
def hdx_frequency_list(for_sysadmin=False, include_value=None):
1✔
833
    result = [
1✔
834
        {'value': key, 'text': val['title'], 'onlySysadmin': False}
835
        for key, val in UPDATE_FREQ_INFO.items()
836
    ]
837
    result.insert(0, {'value': '-999', 'text': '-- Please select --', 'onlySysadmin': False})
1✔
838

839
    filtered_result = result
1✔
840
    if not for_sysadmin:
1✔
841
        filtered_result = [r for r in result if not r.get('onlySysadmin') or include_value == r.get('value')]
×
842

843
    return filtered_result
1✔
844

845

846
def hdx_get_frequency_by_value(value):
1✔
847
    return UPDATE_FREQ_INFO.get(value, {}).get('title', '')
1✔
848

849

850
# def hdx_get_layer_info(id=None):
851
#     layer = explorer.explorer_data.get(id)
852
#     return layer
853

854

855
def hdx_get_carousel_list():
1✔
856
    return logic.get_action('hdx_carousel_settings_show')({'max_items': 3}, {})
1✔
857

858

859
def hdx_get_quick_links_list(archived=None):
1✔
860
    result = logic.get_action('hdx_quick_links_settings_show')({}, {})
1✔
861
    if archived in (True, False):
1✔
862
        result = [item for item in result if item.get('archived',False) == archived]
1✔
863
    return result
1✔
864

865

866
def _get_context():
1✔
867
    return {
1✔
868
        'model': model,
869
        'session': model.Session,
870
        'user': c.user or c.author,
871
        'auth_user_obj': c.userobj
872
    }
873

874

875
def _get_action(action, data_dict):
1✔
876
    return toolkit.get_action(action)(_get_context(), data_dict)
1✔
877

878

879
def hdx_is_current_user_a_maintainer(maintainers, pkg):
1✔
880
    if c.user:
1✔
881
        current_user = _get_action('user_show', {'id': c.user})
1✔
882
        user_id = current_user.get('id')
1✔
883
        user_name = current_user.get('name')
1✔
884

885
        if maintainers and (user_id in maintainers or user_name in maintainers):
1✔
886
            return True
1✔
887

888
    return False
1✔
889

890

891
def hdx_organization_list_for_user(user_id):
1✔
892
    orgs = []
×
893
    if user_id:
×
894
        # context = {
895
        #     'model': model,
896
        #     'session': model.Session,
897
        # }
898
        # user = model.User.get(user_id)
899
        # if user:
900
        #     context['auth_user_obj'] = user
901
        #     context['user'] = user.name
902
        # return tk.get_action('organization_list_for_user')(context, {'id': user_id})
903
        return tk.get_action('hdx_organization_list_for_user')(_get_context(), {'id': user_id})
×
904
    return orgs
×
905

906

907
def hdx_dataset_is_hxl(tag_list):
1✔
908
    for tag in tag_list:
1✔
UNCOV
909
        if tag.get('name') == 'hxl' and tag.get('display_name') == 'hxl':
×
910
            return True
×
911
    return False
1✔
912

913

914
def hdx_dataset_has_sadd(tag_list):
1✔
UNCOV
915
    for tag in tag_list:
×
916
        if tag.get('name') == 'sex and age disaggregated data - sadd' and tag.get(
×
917
            'display_name') == 'sex and age disaggregated data - sadd':
918
            return True
×
UNCOV
919
    return False
×
920

921

922
def hdx_switch_url_path():
1✔
923
    return switch_url_path()
×
924

925

926
def hdx_munge_title(title):
1✔
927
    return munge.munge_title_to_name(title)
1✔
928

929

930
def hdx_check_http_response(response_code, comparison_http_code):
1✔
931
    '''
932
    :param response_code: code generated by the python APP (can come from flask or pylons in different types)
933
    :type response_code: Union[int, str, list]
934
    :param comparison_http_code: what we're comparing against
935
    :type comparison_http_code: int
936

937
    :returns: whether the response_code matches the comparison_http_code
938
    :rtype: bool
939
    '''
940
    try:
1✔
941
        if response_code == comparison_http_code:
1✔
942
            return True
1✔
943
        elif response_code == str(comparison_http_code):
1✔
944
            return True
×
945
        elif (hasattr(response_code, "__len__")) and (response_code[0] == comparison_http_code):
1✔
946
            return True
1✔
947
    except TypeError as e:
×
948
        log.info('Following error was generated from hdx_check_http_response():' + text_type(e))
×
949
    return False
1✔
950

951

952
def hdx_get_request_param(param_name, default_value):
1✔
953
    '''
954
    This should get the request param value whether we're in pylons or flask
955
    '''
956
    try:
1✔
957
        value = request.args.get(param_name)
1✔
958
    except Exception as e:
×
959
        log.warning('Error when looking into "args" of request. This could be normal in a pylons request: '
×
960
                    + text_type(e))
961
        value = request.args.get(param_name)
×
962

963
    value = default_value if value is None else value
1✔
964
    return value
1✔
965

966

967
def hdx_url_for(*args, **kw):
1✔
968
    '''
969
    Removes the '/' at the end of an URL returned by the core CKAN url_for() helper.
970
    It appears when url_for() thinks it can return a flask route. But if it's a pylons
971
    controller that needs to render the page the '/' gets in the way.
972
    '''
973

974
    # If the html template calls this with both named route and controller + action, just use named route
975
    if len(args) > 0 and args[0]:
1✔
976
        kw.pop('controller', None)
1✔
977
        kw.pop('action', None)
1✔
978
    else:
979
        args = []
1✔
980

981
    url = tk.url_for(*args, **kw)
1✔
982
    if url and len(url) > 1:
1✔
983
        if url.endswith('/'):
1✔
984
            url = url[:-1]
1✔
985
        elif '/?' in url:
1✔
986
            url = url.replace('/?', '?')
1✔
987
    return url
1✔
988

989

990
url_for = hdx_url_for
1✔
991

992

993
def hdx_pending_request_data(user_id, package_id):
1✔
994
    return requestdata_model.ckanextRequestdata.get_pending_requests(package_id, user_id)
1✔
995

996

997
def hdx_dataset_is_p_coded(resource_list):
1✔
UNCOV
998
    for resource in resource_list:
×
UNCOV
999
        if resource.get('p_coded'):
×
1000
            return True
×
UNCOV
1001
    return False
×
1002

1003

1004
def hdx_get_allowed_tags_list():
1✔
1005
    context = {'model': model, 'session': model.Session, 'user': c.user}
1✔
1006

1007
    approved_tags = logic.get_action('cached_approved_tags_list')(context, {})
1✔
1008

1009
    is_sysadmin = new_authz.is_sysadmin(c.user)
1✔
1010
    allowed_tags = []
1✔
1011
    for tag in approved_tags:
1✔
1012
        # Only sysadmins are allowed to use tags starting with "crisis-"
1013
        if tag.startswith('crisis-') and not is_sysadmin:
1✔
1014
            continue
×
1015
        allowed_tags.append(tag)
1✔
1016

1017
    allowed_tags_dict_list = [{'value': tag, 'text': tag} for tag in allowed_tags]
1✔
1018

1019
    return allowed_tags_dict_list
1✔
1020

1021

1022
def bs5_build_nav_icon(menu_item, title, **kw):
1✔
1023
    '''Build a navigation item used for example in ``user/read_base.html``.
1024

1025
    Outputs ``<li class="nav-item"><a href="..." class=""nav-link"><i class="icon.."></i> title</a></li>``.
1026

1027
    :param menu_item: the name of the defined menu item defined in
1028
      config/routing as the named route of the same name
1029
    :type menu_item: string
1030
    :param title: text used for the link
1031
    :type title: string
1032
    :param kw: additional keywords needed for creating url eg ``id=...``
1033

1034
    :rtype: HTML literal
1035

1036
    '''
1037
    return _bs5_make_menu_item(menu_item, title, **kw)
1✔
1038

1039

1040
def _bs5_make_menu_item(menu_item, title, **kw):
1✔
1041
    ''' build a navigation item used for example breadcrumbs
1042

1043
    outputs <li class="nav-item"><a href="..." class="nav-link"></i> title</a></li>
1044

1045
    :param menu_item: the name of the defined menu item defined in
1046
    config/routing as the named route of the same name
1047
    :type menu_item: string
1048
    :param title: text used for the link
1049
    :type title: string
1050
    :param **kw: additional keywords needed for creating url eg id=...
1051

1052
    :rtype: HTML literal
1053

1054
    This function is called by wrapper functions.
1055
    '''
1056
    controller, action = menu_item.split('.')
1✔
1057
    item = {
1✔
1058
        'action': action,
1059
        'controller': controller
1060
    }
1061
    item.update(kw)
1✔
1062
    # Remove highlight controllers so that they won't appear in generated urls.
1063
    item.pop('highlight_controllers', False)
1✔
1064
    link = h._link_to(title, menu_item, suppress_active_class=False, **item)
1✔
1065
    return h.literal('<li class="nav-item">') + link + h.literal('</li>')
1✔
1066

1067

1068
def hdx_decode_markup(value):
1✔
1069
    try:
×
1070
        unescaped_value = value.unescape()
×
1071
        decoded_value = json.loads(unescaped_value.replace("'", '"'))
×
1072

1073
        if isinstance(decoded_value, dict):
×
1074
            messages = []
×
1075
            for error_field, error_messages in decoded_value.items():
×
1076
                for error_message in error_messages:
×
1077
                    messages.append(error_message.strip().rstrip('.'))
×
1078
            return '. '.join(messages) + '.'
×
1079
        else:
1080
            return decoded_value
×
1081

1082
    except Exception as e:
×
1083
        return value
×
1084

1085
def hdx_generate_basemap_config_string() -> str:
1✔
1086
    conf_dict = {
×
1087
        'baseMapUrl': config.get('hdx.mapbox.baselayer.url'),
1088
        'token': config.get('hdx.mapbox.baselayer.token'),
1089
    }
1090
    return json.dumps(conf_dict)
×
1091

1092

1093
def hdx_supports_notifications(object_type: Union[ObjectType, str], object_id: str,
1✔
1094
                               object_dict: Optional[dict[str, Any]] = None) -> bool:
1095
    supports_notifications = False
1✔
1096

1097
    # Convert object_type to ObjectType if it's a string
1098
    if isinstance(object_type, str):
1✔
1099
        try:
1✔
1100
            object_type = ObjectType(object_type)
1✔
1101
        except ValueError:
×
1102
            log.error(f'Invalid string for object_type: {object_type}')
×
1103
            return supports_notifications
×
1104

1105
    log.info(f'Checking if notifications are supported for object_id: {object_id} of type: {object_type}')
1✔
1106

1107
    if object_id:
1✔
1108
        if object_type in (ObjectType.DATASET, ObjectType.GROUP, ObjectType.ORGANIZATION, ObjectType.CRISIS):
1✔
1109
            supports_notifications = _check_notifications_enabled_for_object(object_type, object_id, object_dict)
1✔
1110
        else:
1111
            log.error(f'Invalid object_type: {object_type}')
×
1112
    else:
1113
        log.error(f'Invalid object_id: {object_id}')
×
1114

1115
    return supports_notifications
1✔
1116

1117

1118
def _check_notifications_enabled_for_object(object_type: ObjectType, object_id: str,
1✔
1119
                                            object_dict: Optional[dict[str, Any]] = None) -> bool:
1120
    object_identifier = f"{object_type.value}_{object_id}"
1✔
1121
    log.info(f'Checking notifications for object: {object_identifier}')
1✔
1122

1123
    if object_type == ObjectType.DATASET and object_dict:
1✔
1124
        is_hdx_connect = str(object_dict.get('is_requestdata_type', False)).lower() == 'true'
1✔
1125
        is_private = str(object_dict.get('private', False)).lower() == 'true'
1✔
1126
        is_archived = str(object_dict.get('archived', False)).lower() == 'true'
1✔
1127
        is_update_frequency_never = object_dict.get('data_update_frequency', '') == UPDATE_FREQ_NEVER
1✔
1128

1129
        log.debug(f'Object properties - is_hdx_connect: {is_hdx_connect}, is_private: {is_private}, '
1✔
1130
                  f'is_archived: {is_archived}, is_update_frequency_never: {is_update_frequency_never}')
1131

1132
        if is_hdx_connect or is_private or is_archived or is_update_frequency_never:
1✔
1133
            log.info(f'Notifications disabled for object: {object_identifier}')
1✔
1134
            return False
1✔
1135
    elif object_type == ObjectType.CRISIS and object_dict:
1✔
1136
        is_archived = object_dict.get('status') == 'archived'
1✔
1137
        sections = object_dict.get('sections', [])
1✔
1138
        if isinstance(sections, str):
1✔
1139
            sections = json.loads(sections)
×
1140
        has_data_list = any(section.get('type') == 'data_list' for section in sections)
1✔
1141

1142
        log.debug(f'Object properties - is_archived: {is_archived}, has_data_list: {has_data_list}')
1✔
1143

1144
        if is_archived or not has_data_list:
1✔
1145
            log.info(f'Notifications disabled for object: {object_identifier}')
1✔
1146
            return False
1✔
1147

1148
    if config.get('hdx.notifications.enabled_objects_csv'):
1✔
1149
        log.info('Using cache for enabled objects for notifications')
×
1150
        objects_with_notifications = cached_objects_with_notifications()
×
1151
        return object_identifier in objects_with_notifications
×
1152
    elif config.get('hdx.notifications.disabled_objects_csv'):
1✔
1153
        log.info('Using cache for disabled objects for notifications')
×
1154
        objects_without_notifications = cached_objects_without_notifications()
×
1155
        return object_identifier not in objects_without_notifications
×
1156

1157
    log.info(f'No notifications configuration found for object: {object_identifier}')
1✔
1158
    return False
1✔
1159

1160

1161
def facet_url_extra_args(facet_list, request_args):
1✔
1162
    extra_args = {}
1✔
1163

1164
    for facet_id, facet in facet_list.items():
1✔
1165
        param_values = []
1✔
1166
        categkey_holder = {'key': 'KeyError'}
1✔
1167

1168
        items = facet.get('items', [])
1✔
1169
        for fi in items:
1✔
1170
            if fi.get('selected') is True:
1✔
1171
                categkey = fi.get('category_key')
1✔
1172
                if categkey == 'cod_level' and fi.get('name') == 'ALL':
1✔
1173
                    # Flatten sub-items if 'ALL' is selected
1174
                    param_values.extend([subitem['name'] for subitem in fi.get('items', [])])
×
1175
                else:
1176
                    param_values.append(fi.get('name'))
1✔
1177

1178
                categkey_holder['key'] = categkey
1✔
1179

1180
        if param_values:
1✔
1181
            extra_args[categkey_holder['key']] = param_values
1✔
1182

1183
    # Also include preserved request arguments
1184
    for key in ['q', 'sort', 'ext_page_size', 'ext_archived']:
1✔
1185
        val = request_args.get(key)
1✔
1186
        if val:
1✔
1187
            extra_args[key] = [val] if not isinstance(val, list) else val
1✔
1188

1189
    return extra_args
1✔
1190

1191

1192
def build_facet_filter_url(option, extra_args):
1✔
1193
    category_key = option.get('category_key')
1✔
1194
    remove = option.get('selected')
1✔
1195
    base_path = request.path
1✔
1196

1197
    if category_key in {'cod_level', 'vocab_Topics'} and option.get('name') == 'ALL':
1✔
1198
        param_values = [c.get('name') for c in option.get('items', [])]
1✔
1199
    else:
1200
        param_values = [option.get('name')]
1✔
1201

1202
    group_args = extra_args.get(category_key, []).copy()
1✔
1203

1204
    if remove:
1✔
1205
        group_args = [v for v in group_args if v not in param_values]
1✔
1206
    else:
1207
        group_args += [v for v in param_values if v not in group_args]
1✔
1208

1209
    extra_args[category_key] = group_args
1✔
1210

1211
    query_parts = []
1✔
1212
    for key, values in extra_args.items():
1✔
1213
        for val in values:
1✔
1214
            query_parts.append((key, val))
1✔
1215

1216
    if query_parts:
1✔
1217
        return f"{base_path}?{urlencode(query_parts)}"
1✔
1218
    return base_path
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc