• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #6405

13 Jun 2025 01:08PM UTC coverage: 74.954% (+0.02%) from 74.933%
#6405

Pull #6645

coveralls-python

ccataalin
HDX-10666 fix wrong value for object type and name
Pull Request #6645: PR: HDX-10666 & HDX-10877: notifications hub and unsubscribe logic

8 of 13 new or added lines in 5 files covered. (61.54%)

4 existing lines in 4 files now uncovered.

12934 of 17256 relevant lines covered (74.95%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.84
/ckanext-hdx_package/ckanext/hdx_package/views/light_dataset.py
1
from flask import Blueprint
1✔
2

3
import ckan.model as model
1✔
4
import ckan.plugins.toolkit as tk
1✔
5

6
from ckan.common import g
1✔
7
from ckan.types import DataDict, Request
1✔
8
import ckan.logic as logic
1✔
9

10
import ckanext.hdx_package.helpers.analytics as analytics
1✔
11
import ckanext.hdx_package.helpers.custom_pages as cp_h
1✔
12
from ckanext.hdx_search.controller_logic.search_logic import SearchLogic, ArchivedUrlHelper
1✔
13
from ckanext.hdx_theme.util.http_exception_helper import catch_http_exceptions
1✔
14
from ckanext.hdx_theme.util.light_redirect import check_redirect_needed
1✔
15
from ckanext.hdx_users.controller_logic.notification_platform_logic import verify_unsubscribe_token
1✔
16
from ckanext.hdx_users.helpers.notification_platform import check_notifications_enabled_for_dataset
1✔
17

18
get_action = tk.get_action
1✔
19
check_access = tk.check_access
1✔
20
render = tk.render
1✔
21
abort = tk.abort
1✔
22
_ = tk._
1✔
23

24
NotAuthorized = tk.NotAuthorized
1✔
25
NotFound = logic.NotFound
1✔
26

27
hdx_light_dataset = Blueprint(u'hdx_light_dataset', __name__, url_prefix=u'/m/dataset')
1✔
28
hdx_light_search = Blueprint(u'hdx_light_search', __name__, url_prefix=u'/m/search')
1✔
29

30

31
def _get_org_extras(org_id):
1✔
32
    """
33
    Get the extras for our orgs
34
    """
35
    if not org_id:
1✔
36
        return {}
×
37
    context = {'model': model, 'session': model.Session,
1✔
38
               'user': g.user or g.author,
39
               'include_datasets': False,
40
               'for_view': True}
41
    data_dict = {'id': org_id}
1✔
42
    org_info = get_action(
1✔
43
        'hdx_light_group_show')(context, data_dict)
44

45
    extras_dict = {item['key']: item['value'] for item in org_info.get('extras', {})}
1✔
46
    extras_dict['image_url'] = org_info.get('image_url', None)
1✔
47

48
    return extras_dict
1✔
49

50
@check_redirect_needed
1✔
51
@catch_http_exceptions
1✔
52
def read(id):
1✔
53
    context = {
1✔
54
        u'model': model,
55
        u'session': model.Session,
56
        u'user': g.user,
57
        u'auth_user_obj': g.userobj,
58
        u'for_view': True
59
    }
60
    data_dict = {
1✔
61
        u'id': id
62
    }
63

64
    dataset_dict = get_action('package_show')(context, data_dict)
1✔
65
    org_dict = dataset_dict.get('organization') or {}
1✔
66
    org_id = org_dict.get('id', None)
1✔
67
    org_info_dict = _get_org_extras(org_id)
1✔
68
    user_survey_url = org_info_dict.get('user_survey_url')
1✔
69
    if dataset_dict.get('type') == 'dataset':
1✔
70
        analytics_dict = _compute_analytics(dataset_dict, tk.request)
1✔
71
        dataset_dict['page_list'] = cp_h.hdx_get_page_list_for_dataset(context, dataset_dict)
1✔
72
        dataset_dict['link_list'] = get_action('hdx_package_links_by_id_list')(context, {'id': dataset_dict.get('name')})
1✔
73

74
        # notification platform
75
        unsubscribe_token = tk.request.args.get('unsubscribe_token', None)
1✔
76
        if unsubscribe_token:
1✔
UNCOV
77
            try:
×
78
                token_obj = verify_unsubscribe_token(unsubscribe_token, inactivate=False)
×
79
            except Exception as e:
×
80
                unsubscribe_token = None
×
81
                tk.h.flash_error('Your token is invalid or has expired.')
×
82

83
        template_data = {
1✔
84
            'dataset_dict': dataset_dict,
85
            'analytics': analytics_dict,
86
            'user_survey_url': user_survey_url,
87
            'unsubscribe_token': unsubscribe_token,
88
        }
89

90
        return render(u'light/dataset/read.html', template_data)
1✔
91
    else:
92
        raise NotFound
×
93

94

95
@check_redirect_needed
1✔
96
def search():
1✔
97
    return generic_search(u'light/search/search.html')
1✔
98

99

100
def generic_search(html_template):
1✔
101
    search_logic = SearchLogic()
1✔
102

103
    try:
1✔
104
        search_logic._search(use_solr_collapse=True)
1✔
105
    except NotFound as ex:
×
106
        abort(404, _('Page not found'))
×
107

108
    archived_url_helper = search_logic.add_archived_url_helper()
1✔
109
    redirect_result = archived_url_helper.redirect_if_needed()
1✔
110
    if redirect_result:
1✔
111
        return redirect_result
×
112

113
    data_dict = {'data': search_logic.template_data}
1✔
114
    return render(html_template, data_dict)
1✔
115

116

117
def _compute_analytics(dataset_dict: DataDict, request: Request):
1✔
118
    result = {
1✔
119
        'is_cod': analytics.is_cod(dataset_dict),
120
        'is_indicator': analytics.is_indicator(dataset_dict),
121
        'is_archived': analytics.is_archived(dataset_dict),
122
        'analytics_group_names': (analytics.extract_locations_in_json(dataset_dict))[0],
123
        'analytics_group_ids': (analytics.extract_locations_in_json(dataset_dict))[1],
124
        'analytics_dataset_availability': analytics.dataset_availability(dataset_dict),
125
        'analytics_came_from': analytics.came_from(request.args),
126
        'analytics_supports_notifications': analytics.supports_notifications(dataset_dict),
127
    }
128
    return result
1✔
129

130

131
hdx_light_search.add_url_rule(u'/', view_func=search, strict_slashes=False)
1✔
132
hdx_light_dataset.add_url_rule(u'/', view_func=search, strict_slashes=False)
1✔
133
hdx_light_dataset.add_url_rule(u'/<id>', view_func=read)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc