• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #5935

05 Dec 2024 10:26AM UTC coverage: 74.957% (+0.07%) from 74.883%
#5935

Pull #6496

coveralls-python

danmihaila
HDX-10398 add testing
Pull Request #6496: HDX-10398 export data completeness and rename "completness"

54 of 65 new or added lines in 4 files covered. (83.08%)

62 existing lines in 3 files now uncovered.

12562 of 16759 relevant lines covered (74.96%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.72
/ckanext-hdx_org_group/ckanext/hdx_org_group/actions/get.py
1
"""
2
Created on April 24, 2015
3

4
@author: alexandru-m-g
5
"""
6
import json
1✔
7
import logging
1✔
8

9
import ckan.lib.dictization as d
1✔
10
import ckan.lib.helpers as helpers
1✔
11
import ckan.lib.navl.dictization_functions
1✔
12
import ckan.model as model
1✔
13
import ckan.plugins.toolkit as tk
1✔
14
import ckanext.hdx_org_group.dao.indicator_access as indicator_access
1✔
15
import ckanext.hdx_org_group.dao.widget_data_service as widget_data_service
1✔
16
import ckanext.hdx_org_group.helpers.country_helper as country_helper
1✔
17
import ckanext.hdx_org_group.helpers.organization_helper as org_helper
1✔
18
from ckan.common import c
1✔
19
from ckanext.hdx_theme.helpers.caching import cached_make_rest_api_request as cached_make_rest_api_request
1✔
20

21
_validate = ckan.lib.navl.dictization_functions.validate
1✔
22

23
log = logging.getLogger(__name__)
1✔
24

25
config = tk.config
1✔
26
get_action = tk.get_action
1✔
27
_check_access = tk.check_access
1✔
28
_get_or_bust = tk.get_or_bust
1✔
29
NotFound = tk.ObjectNotFound
1✔
30
ValidationError = tk.ValidationError
1✔
31
side_effect_free = tk.side_effect_free
1✔
32

33
IndicatorAccess = indicator_access.IndicatorAccess
1✔
34

35

36
@side_effect_free
1✔
37
def hdx_datasets_for_group(context, data_dict):
1✔
38
    """
39
    Returns a paginated list of datasets for a group with 25 items per page.
40
    Options for sorting are: metadata_modified desc, title_case_insensitive desc, title_case_insensitive asc,
41
    views_recent desc, score desc ( only useful if query string is specified, should be combined
42
    with metadata_modified desc )
43
    :param id: the id of the group for which datasets are requested
44
    :type id: string
45
    :param page: page number starting from 1
46
    :type page: int
47
    :param sort: the field by which the datasets should be sorted. Defaults to 'metadata_modified desc'
48
    :type sort: string
49
    :param q: query string
50
    :type q: string
51
    :param type: 'all', 'indicators', 'datasets'. Defaults to 'all'
52
    :type q: string
53
    :return:
54
    """
55

56
    skipped_keys = ['q', 'id', 'sort', 'type', 'page']
×
57

NEW
58
    id = _get_or_bust(data_dict, 'id')
×
59

60
    limit = 25
×
61

62
    sort_option = data_dict.get('sort', 'metadata_modified desc')
×
63

64
    page = int(data_dict.get('page', 1))
×
65
    new_data_dict = {'sort': sort_option,
×
66
                     'rows': limit,
67
                     'start': (page - 1) * limit,
68
                     }
69
    type = data_dict.get('type', None)
×
70
    if type == 'indicators':
×
71
        new_data_dict['ext_indicator'] = u'1'
×
72
    elif type == 'datasets':
×
73
        new_data_dict['ext_indicator'] = u'0'
×
74

75
    search_param_list = [
×
76
        key + ':' + value for key, value in data_dict.iteritems() if key not in skipped_keys]
77
    search_param_list.append(u'groups:{}'.format(id))
×
78

79
    if search_param_list != None:
×
NEW
80
        new_data_dict['fq'] = ' '.join(
×
81
            search_param_list) + ' +dataset_type:dataset'
82

83
    if data_dict.get('q', None):
×
84
        new_data_dict['q'] = data_dict['q']
×
85

NEW
86
    query = get_action('package_search')(context, new_data_dict)
×
87

88
    return query
×
89

90

91
@side_effect_free
1✔
92
def hdx_topline_num_for_group(context, data_dict):
1✔
93
    """
94
    :param id: the id of the group for which top line numbers are requested
95
    :type id: string
96
    :return: a dict of top line numbers. Please note that depending on the selected group the source
97
    of the data ( either the datastore or CPS/indicators ) might be different. The data will have some fields
98
     that are specific to the source.
99
    """
100
    id = _get_or_bust(data_dict, 'id')
1✔
101
    grp_result = get_group(id)
1✔
102
    group_info = grp_result.get('group_info')
1✔
103
    # custom_dict = grp_result.get('custom_dict')
104

105
    # datastore_id = custom_dict.get('topline_resource', None)
106

107
    common_format = data_dict.get('common_format', True) not in ['false', '0']  # type: bool
1✔
108

109
    # if group_info.get('custom_loc', False) and datastore_id:
110
    #     # source is datastore
111
    #     crisis_data_access = location_data_access.LocationDataAccess(datastore_id)
112
    #     crisis_data_access.fetch_data(context)
113
    #     top_line_items = crisis_data_access.get_top_line_items()
114
    #     for item in top_line_items:
115
    #         item['source_system'] = 'datastore'
116
    #         del item['units']
117
    if group_info.get('activity_level') == 'active':
1✔
118
        top_line_items = __get_toplines_for_active_country(group_info, common_format)
×
119
    else:
120
        top_line_items = __get_toplines_for_standard_country(group_info, common_format)
1✔
121

122
    return top_line_items
1✔
123

124

125
def __get_toplines_for_active_country(group_info, common_format):
1✔
126
    """
127
    :param group_info:
128
    :type group_info: dict
129
    :param common_format:
130
    :type common_format: bool
131
    :return:
132
    :rtype: list
133
    """
134

135
    # source is rw
136
    top_line_data_list = widget_data_service.build_widget_data_access(group_info).get_dataset_results()
×
137
    if common_format:
×
138
        def _parse_integer_value(item):
×
139
            try:
×
140
                value = float(item.get('value', ''))
×
141
            except:
×
142
                value = None
×
143
            return value
×
144

145
        top_line_items = [
×
146
            {
147
                'source_system': 'reliefweb crisis app',
148
                'code': item.get('indicatorTypeCode', ''),
149
                'title': item.get('indicatorTypeName', ''),
150
                'source_link': item.get('datasetLink', ''),
151
                'source': item.get('sourceName', ''),
152
                'value': _parse_integer_value(item),
153
                'latest_date': item.get('time', ''),
154
                'units': item.get('units', )
155
            }
156
            for item in top_line_data_list
157
        ]
158
    else:
159
        top_line_items = top_line_data_list
×
160
    return top_line_items
×
161

162

163
def __get_toplines_for_standard_country(group_info, common_format):
1✔
164
    """
165
    :param group_info:
166
    :type group_info: dict
167
    :param common_format:
168
    :type common_format: bool
169
    :return:
170
    :rtype: list
171
    """
172
    # source is configured in 'hdx.locations.toplines_url'
173
    # ckan_site_url = config.get('ckan.site_url')
174
    raw_top_line_items = widget_data_service.build_widget_data_access(group_info).get_dataset_results()
1✔
175
    # ckan_data = indicator_dao.fetch_indicator_data_from_ckan()
176
    top_line_items = []
1✔
177
    if common_format:
1✔
178
        for item in raw_top_line_items:
×
179
            code = item.get('indicatorTypeCode', '')
×
180
            title = item.get('indicatorTypeName', '')
×
181
            new_item = {
×
182
                'source_system': 'cps',
183
                'code': code or title,
184
                'title': title or code,
185
                # 'source_link': ckan_site_url + ckan_data.get(code, {}).get('datasetLink', ''),
186
                'source': item.get('sourceName', ''),
187
                'value': item.get('value', ''),
188
                'latest_date': item.get('time', ''),
189
                'units': item.get('unitName', )
190
            }
191
            top_line_items.append(new_item)
×
192
    else:
193
        top_line_items = raw_top_line_items
1✔
194
    return top_line_items
1✔
195

196

197
@side_effect_free
1✔
198
def hdx_light_group_show(context, data_dict):
1✔
199
    """
200
    Return a lightweight ( less resource intensive,faster but without datasets ) version of the group details
201
    :param id: the id of the group for which top line numbers are requested
202
    :type id: string
203
    """
204

205
    id = _get_or_bust(data_dict, 'id')
1✔
206
    group_dict = {}
1✔
207
    group = model.Group.get(id)
1✔
208
    if not group:
1✔
209
        raise NotFound
×
210
    if group.state == 'deleted' and (not c.userobj or not c.userobj.sysadmin):
1✔
211
        raise NotFound
×
212
    # group_dict['group'] = group
213
    group_dict['id'] = group.id
1✔
214
    group_dict['name'] = group.name
1✔
215
    group_dict['image_url'] = group.image_url
1✔
216
    group_dict['display_name'] = group_dict['title'] = group.title
1✔
217
    group_dict['description'] = group.description
1✔
218
    # group_dict['revision_id'] = group.revision_id
219
    group_dict['state'] = group.state
1✔
220
    group_dict['created'] = group.created
1✔
221
    group_dict['type'] = group.type
1✔
222

223
    result_list = []
1✔
224
    for name, extra in group._extras.items():
1✔
225
        dictized = d.table_dictize(extra, context)
1✔
226
        if not extra.state == 'active':
1✔
227
            continue
×
228
        value = dictized['value']
1✔
229
        result_list.append(dictized)
1✔
230

231
        # Keeping the above for backwards compatibility
232
        group_dict[name] = dictized['value']
1✔
233

234
    group_dict['extras'] = sorted(result_list, key=lambda x: x['key'])
1✔
235
    return group_dict
1✔
236

237

238
def get_group(id):
1✔
239
    context = {'model': model, 'session': model.Session,
1✔
240
               'include_datasets': False,
241
               'for_view': True}
242
    data_dict = {'id': id}
1✔
243

244
    group_info = get_action('hdx_light_group_show')(context, data_dict)
1✔
245

246
    extras_dict = {item['key']: item['value'] for item in group_info.get('extras', {})}
1✔
247
    json_string = extras_dict.get('customization', None)
1✔
248
    if json_string:
1✔
249
        custom_dict = json.loads(json_string)
×
250
    else:
251
        custom_dict = {}
1✔
252

253
    return {'group_info': group_info, 'custom_dict': custom_dict}
1✔
254

255

256
@side_effect_free
1✔
257
def hdx_trigger_screencap(context, data_dict):
1✔
258
    cfg = context['cfg']
×
259
    file_path = context['file_path']
×
260
    # checking if user is sysadmin
261
    sysadmin = False
×
262
    if data_dict.get('reset_thumbnails', 'false') == 'true':
×
263
        try:
×
264
            _check_access('hdx_trigger_screencap', context, data_dict)
×
265
            sysadmin = True
×
266
        except:
×
267
            return False
×
268
    if not sysadmin and not context.get('reset', False):
×
269
        return False
×
270
    if not cfg['screen_cap_asset_selector']:  # If there's no selector set just don't bother
×
271
        return False
×
272

273
    return org_helper.hdx_capturejs(config['ckan.site_url'] + helpers.url_for('organization_read', id=cfg['org_name']),
×
274
                                    file_path, cfg['screen_cap_asset_selector'])
275

276

277
@side_effect_free
1✔
278
def hdx_get_locations_info_from_rw(context, data_dict):
1✔
279
    try:
×
280
        url = data_dict.get('rw_url')
×
281
        if url:
×
282
            return cached_make_rest_api_request(url)
×
283
        return None
×
284
    except:
×
NEW
285
        log.error('RW file was not found or can not be accessed')
×
286
        return None
×
287

288

289
@side_effect_free
1✔
290
def hdx_organization_follower_list(context, data_dict):
1✔
291
    """Return the list of users that are following the given organization.
292

293
    :param id: the id or name of the organization
294
    :type id: string
295

296
    :rtype: list of dictionaries
297

298
    """
299
    _check_access('hdx_organization_follower_list', context, data_dict)
×
300
    context['keep_email'] = True
×
301
    return _follower_list(
×
302
        context, data_dict,
303
        ckan.logic.schema.default_follow_group_schema(),
304
        context['model'].UserFollowingGroup)
305

306

307
def _follower_list(context, data_dict, default_schema, FollowerClass):
1✔
308
    schema = context.get('schema', default_schema)
×
309
    data_dict, errors = _validate(data_dict, schema, context)
×
310
    if errors:
×
311
        raise ValidationError(errors)
×
312

313
    # Get the list of Follower objects.
314
    model = context['model']
×
315
    object_id = data_dict.get('id')
×
316
    followers = FollowerClass.follower_list(object_id)
×
317

318
    # Convert the list of Follower objects to a list of User objects.
319
    users = [model.User.get(follower.follower_id) for follower in followers]
×
320
    users = [user for user in users if user is not None]
×
321

322
    # Dictize the list of User objects.
323
    return _user_list_dictize(users, context)
×
324

325

326
def _user_list_dictize(obj_list, context,
1✔
327
                       sort_key=lambda x: x['name'], reverse=False):
328
    import ckan.lib.dictization.model_dictize as model_dictize
×
329
    result_list = []
×
330

331
    for obj in obj_list:
×
332
        user_dict = model_dictize.user_dictize(obj, context)
×
333
        user_dict.pop('reset_key', None)
×
334
        user_dict.pop('apikey', None)
×
335
        # user_dict.pop('email', None)
336
        result_list.append(user_dict)
×
337
    return sorted(result_list, key=sort_key, reverse=reverse)
×
338

339
@side_effect_free
1✔
340
def hdx_datagrid_show(context, data_dict):
1✔
341
    id = _get_or_bust(data_dict, 'id')
1✔
342
    if id:
1✔
343
        grp_dict = get_action('hdx_light_group_show')(context, {'id':id})
1✔
344
        data_completeness = country_helper._get_data_completeness(grp_dict.get('name'))
1✔
345
        replaced_data_completeness = country_helper.hdx_replace_datagrid_labels(data_completeness, grp_dict)
1✔
346
        return replaced_data_completeness
1✔
347
    else:
NEW
348
        raise NotFound('Group was not found.')
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc