• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #5218

11 Apr 2024 11:29AM UTC coverage: 71.375% (+0.06%) from 71.316%
#5218

Pull #6302

coveralls-python

danmihaila
HDX-9444 HDX-9443 HDX-9442 review email subject
Pull Request #6302: Feature/dev org join hdx 9447

98 of 110 new or added lines in 10 files covered. (89.09%)

14 existing lines in 2 files now uncovered.

11667 of 16346 relevant lines covered (71.38%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.15
/ckanext-hdx_users/ckanext/hdx_users/views/user_onboarding_view.py
1
import json
1✔
2
import logging as logging
1✔
3

4
import ckan.lib.navl.dictization_functions as dictization_functions
1✔
5
import ckan.logic as logic
1✔
6
import ckan.plugins.toolkit as tk
1✔
7

8
import ckanext.hdx_theme.util.mail as hdx_mail
1✔
9
import ckanext.hdx_users.helpers.mailer as hdx_mailer
1✔
10
import ckanext.hdx_users.model as user_model
1✔
11

12
from ckan import model
1✔
13
from ckanext.hdx_users.views.user_view_helper import (
1✔
14
    OnbNotAuth, OnbUserNotFound, OnbIntegrityErr, OnbSuccess, error_message
15
)
16

17

18
abort = tk.abort
1✔
19
g = tk.g
1✔
20
_ = tk._
1✔
21
request = tk.request
1✔
22
h = tk.h
1✔
23
_check_access = tk.check_access
1✔
24
_get_action = tk.get_action
1✔
25
config = tk.config
1✔
26
render = tk.render
1✔
27
_get_validator = tk.get_validator
1✔
28
unicode_safe = tk.get_validator('unicode_safe')
1✔
29

30
log = logging.getLogger(__name__)
1✔
31

32
unflatten = dictization_functions.unflatten
1✔
33
_validate = dictization_functions.validate
1✔
34
DataError = dictization_functions.DataError
1✔
35

36
NotFound = tk.ObjectNotFound
1✔
37
NotAuthorized = tk.NotAuthorized
1✔
38
ValidationError = tk.ValidationError
1✔
39

40

41
class HDXUserOnboardingView:
1✔
42
    def __init__(self):
1✔
43
        pass
1✔
44

45
    def follow_details(self):
1✔
46
        """
47
        Step 4: user follows key entities
48
        :return:
49
        """
50
        data_dict = logic.clean_dict(unflatten(logic.tuplize_dict(logic.parse_params(request.form))))
×
51
        name = g.user or data_dict['id']
×
52
        user_obj = model.User.get(name)
×
53
        user_id = user_obj.id
×
54
        context = {'model': model, 'session': model.Session, 'user': user_obj.name, 'auth_user_obj': g.userobj}
×
55
        try:
×
56
            ue_dict = self._get_ue_dict(user_id, user_model.HDX_ONBOARDING_FOLLOWS)
×
57
            _get_action('user_extra_update')(context, ue_dict)
×
58
        except NotAuthorized:
×
59
            return OnbNotAuth
×
60
        except NotFound as e:
×
61
            return OnbUserNotFound
×
62
        except DataError:
×
63
            return OnbIntegrityErr
×
64
        except ValidationError as e:
×
65
            error_summary = e.error_summary
×
66
            return error_message(error_summary)
×
67
        except Exception as e:
×
68
            error_summary = str(e)
×
69
            return error_message(error_summary)
×
70
        return OnbSuccess
×
71

72
    # def request_new_organization(self):
73
    #     '''
74
    #     Step 5a: user can request to create a new organization
75
    #     :return:
76
    #     '''
77
    #     context = {'model': model, 'session': model.Session, 'auth_user_obj': g.userobj,
78
    #                'user': g.user}
79
    #     try:
80
    #         _check_access('hdx_send_new_org_request', context)
81
    #     except NotAuthorized:
82
    #         return OnbNotAuth
83
    #
84
    #     try:
85
    #         user = model.User.get(context['user'])
86
    #         data = self._process_new_org_request(user)
87
    #         self._validate_new_org_request_field(data, context)
88
    #
89
    #         _get_action('hdx_send_new_org_request')(context, data)
90
    #
91
    #         if data.get('user_extra'):
92
    #             ue_dict = self._get_ue_dict(user.id, user_model.HDX_ONBOARDING_ORG)
93
    #             _get_action('user_extra_update')(context, ue_dict)
94
    #
95
    #     except hdx_mail.NoRecipientException as e:
96
    #         error_summary = str(e)
97
    #         return error_message(error_summary)
98
    #     except ValidationError as e:
99
    #         error_summary = e.error_summary.get('Message') if 'Message' in e.error_summary else e.error_summary
100
    #         return error_message(error_summary)
101
    #     except Exception as e:
102
    #         error_summary = str(e)
103
    #         return error_message(error_summary)
104
    #     return OnbSuccess
105

106
    def request_membership(self):
1✔
107
        '''
108
        Step 5b: user can request membership to an existing organization
109
        :return:
110
        '''
111

112
        context = {'model': model, 'session': model.Session,
×
113
                   'user': g.user, 'auth_user_obj': g.userobj}
114
        try:
×
115
            _check_access('hdx_send_new_org_request', context)
×
116
        except NotAuthorized:
×
117
            return OnbNotAuth
×
118

119
        try:
×
120
            org_id = request.form.get('org_id', '')
×
121
            msg = request.form.get('message', 'please add me to this organization')
×
122

123
            data_dict = {
×
124
                'organization': org_id,
125
                'message': msg,
126
                'save': u'save',
127
                'role': u'member',
128
                'group': org_id
129
            }
130
            member = _get_action('member_request_create')(context, data_dict)
×
131

132
            ue_dict = self._get_ue_dict(g.userobj.id, user_model.HDX_ONBOARDING_ORG)
×
133
            _get_action('user_extra_update')(context, ue_dict)
×
134

135
        except hdx_mail.NoRecipientException as e:
×
136
            return error_message(_(str(e)))
×
137
        except ValidationError as e:
×
138
            log.error(str(e))
×
139
            if isinstance(e.error_summary, dict):
×
140
                error_summary = ' '.join(e.error_summary.values())
×
141
            else:
142
                error_summary = json.dumps(e.error_summary)
×
143
            return error_message(error_summary)
×
144
        except Exception as e:
×
145
            log.error(str(e))
×
146
            return error_message(_('Request can not be sent. Contact an administrator.'))
×
147
        return OnbSuccess
×
148

149
    def invite_friends(self):
1✔
150
        '''
151
        Step 6: user can invite friends by email to access HDX
152
        :return:
153
        '''
154

155
        context = {'model': model, 'session': model.Session, 'auth_user_obj': g.userobj,
×
156
                   'user': g.user}
157
        try:
×
158
            _check_access('hdx_basic_user_info', context)
×
159
        except NotAuthorized:
×
160
            return OnbNotAuth
×
161
        try:
×
162
            if not g.user:
×
163
                return OnbNotAuth
×
164
            # usr = g.userobj.display_name or g.user
165
            user_id = g.userobj.id or g.user
×
166
            ue_dict = self._get_ue_dict(user_id, user_model.HDX_ONBOARDING_FRIENDS)
×
167
            _get_action('user_extra_update')(context, ue_dict)
×
168

169
            subject = u'Invitation to join the Humanitarian Data Exchange (HDX)'
×
170
            email_data = {
×
171
                'user_fullname': g.userobj.fullname,
172
                'user_email': g.userobj.email,
173
            }
174
            cc_recipients_list = [{'display_name': g.userobj.fullname, 'email': g.userobj.email}]
×
175
            friends = [request.form.get('email1'), request.form.get('email2'), request.form.get('email3')]
×
176
            for f in friends:
×
177
                if f and config.get('hdx.onboarding.send_confirmation_email', 'false') == 'true':
×
178
                    hdx_mailer.mail_recipient([{'display_name': f, 'email': f}], subject, email_data,
×
179
                                              cc_recipients_list=cc_recipients_list,
180
                                              snippet='email/content/onboarding_invite_others.html')
181
        except Exception as e:
×
182
            error_summary = str(e)
×
183
            return error_message(error_summary)
×
184
        return OnbSuccess
×
185

186
    def _get_ue_dict(self, user_id, key, value='True'):
1✔
187
        ue_dict = self._build_extras_dict(key, value)
×
188
        ue_dict['user_id'] = user_id
×
189
        return ue_dict
×
190

191
    def _build_extras_dict(self, key, value='True'):
1✔
192
        return {'extras': [{'key': key, 'new_value': value}]}
×
193

194
    def _process_new_org_request(self, user):
1✔
UNCOV
195
        data = {'name': request.form.get('name', ''),
×
196
                # 'title': request.form.get('name', ''),
197
                'description': request.form.get('description', ''),
198
                'description_data': request.form.get('description_data', ''),
199
                'work_email': request.form.get('work_email', ''),
200
                'org_url': request.form.get('url', ''),
201
                'acronym': request.form.get('acronym', ''),
202
                'org_type': request.form.get('org_type') if request.form.get('org_type') != '-1' else '',
203
                'your_email': request.form.get('your_email') or user.email,
204
                'your_name': request.form.get('your_name') or user.fullname or user.name,
205
                'user_extra': request.form.get('user_extra') if request.form.get('user_extra') == 'True' else None
206
                }
UNCOV
207
        return data
×
208

209
    def _validate_new_org_request_field(self, data, context):
1✔
UNCOV
210
        errors = {}
×
UNCOV
211
        for field in ['name', 'description', 'description_data', 'work_email', 'your_name', 'your_email']:
×
UNCOV
212
            if data[field] is None or data[field].strip() == '':
×
213
                errors[field] = [_('should not be empty')]
×
214

UNCOV
215
        if len(errors) > 0:
×
216
            raise ValidationError(errors)
×
217

UNCOV
218
        user_email_validator = _get_validator('email_validator')
×
UNCOV
219
        schema = {'work_email': [user_email_validator, unicode_safe]}
×
UNCOV
220
        data_dict, _errors = _validate(data, schema, context)
×
221

UNCOV
222
        if _errors:
×
223
            raise ValidationError(_errors.get('work_email'))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc