• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

keitaroinc / ckanext-saml2auth / 15157215686

21 May 2025 08:22AM UTC coverage: 91.329% (+0.09%) from 91.241%
15157215686

Pull #126

github

web-flow
Merge 838ffcd0c into b2c6cfc2f
Pull Request #126: Okfn updates

128 of 141 new or added lines in 11 files covered. (90.78%)

10 existing lines in 4 files now uncovered.

969 of 1061 relevant lines covered (91.33%)

3.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.78
/ckanext/saml2auth/plugin.py
1
# encoding: utf-8
2

3
"""
4✔
4
Copyright (c) 2020 Keitaro AB
5

6
This program is free software: you can redistribute it and/or modify
7
it under the terms of the GNU Affero General Public License as
8
published by the Free Software Foundation, either version 3 of the
9
License, or (at your option) any later version.
10

11
This program is distributed in the hope that it will be useful,
12
but WITHOUT ANY WARRANTY; without even the implied warranty of
13
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
GNU Affero General Public License for more details.
15

16
You should have received a copy of the GNU Affero General Public License
17
along with this program.  If not, see <https://www.gnu.org/licenses/>.
18
"""
19
import logging
4✔
20

21
from saml2.client_base import LogoutError
4✔
22
from saml2 import entity
4✔
23

24
from flask import session, redirect, make_response
4✔
25

26
import ckan.plugins as plugins
4✔
27
import ckan.plugins.toolkit as toolkit
4✔
28
from ckan.common import g
4✔
29
import ckan.lib.base as base
4✔
30

31
from ckanext.saml2auth.views.saml2auth import saml2auth
4✔
32
from ckanext.saml2auth.cache import get_subject_id, get_saml_session_info
4✔
33
from ckanext.saml2auth.spconfig import get_config as sp_config
4✔
34
from ckanext.saml2auth import helpers as h
4✔
35
from saml2.s_utils import UnsupportedBinding
4✔
36

37
log = logging.getLogger(__name__)
4✔
38

39

40
class Saml2AuthPlugin(plugins.SingletonPlugin):
4✔
41
    plugins.implements(plugins.IConfigurer)
4✔
42
    plugins.implements(plugins.IBlueprint)
4✔
43
    plugins.implements(plugins.IConfigurable)
4✔
44
    plugins.implements(plugins.ITemplateHelpers)
4✔
45
    plugins.implements(plugins.IAuthenticator, inherit=True)
4✔
46

47
    # ITemplateHelpers
48

49
    def get_helpers(self):
4✔
50
        return {
4✔
51
            'is_default_login_enabled': h.is_default_login_enabled,
52
            'get_saml2auth_login_button_text': h.get_saml2auth_login_button_text,
53
        }
54

55
    # IConfigurable
56

57
    def configure(self, config):
4✔
58
        # Certain config options must exists for the plugin to work. Raise an
59
        # exception if they're missing.
60
        missing_config = "{0} is not configured. Please amend your .ini file."
4✔
61
        config_options = (
4✔
62
            'ckanext.saml2auth.user_email',
63
        )
64
        if not config.get('ckanext.saml2auth.idp_metadata.local_path'):
4✔
65
            config_options += ('ckanext.saml2auth.idp_metadata.remote_url',)
×
66
        for option in config_options:
4✔
67
            if not config.get(option, None):
4✔
68
                raise RuntimeError(missing_config.format(option))
×
69

70
        first_and_last_name = all((
4✔
71
            config.get('ckanext.saml2auth.user_firstname'),
72
            config.get('ckanext.saml2auth.user_lastname')
73
        ))
74
        full_name = config.get('ckanext.saml2auth.user_fullname')
4✔
75

76
        if not first_and_last_name and not full_name:
4✔
77
            raise RuntimeError('''You need to provide both ckanext.saml2auth.user_firstname
×
78
            + ckanext.saml2auth.user_lastname or ckanext.saml2auth.user_fullname'''.strip())
79

80
        acs_endpoint = config.get('ckanext.saml2auth.acs_endpoint')
4✔
81
        if acs_endpoint and not acs_endpoint.startswith('/'):
4✔
82
            raise RuntimeError('ckanext.saml2auth.acs_endpoint should start with a slash ("/")')
×
83

84
    # IBlueprint
85

86
    def get_blueprint(self):
4✔
87
        return [saml2auth]
4✔
88

89
    # IConfigurer
90

91
    def update_config(self, config_):
4✔
92
        toolkit.add_template_directory(config_, 'templates')
4✔
93
        toolkit.add_public_directory(config_, 'public')
4✔
94
        toolkit.add_resource('fanstatic', 'saml2auth')
4✔
95

96
    # IAuthenticator
97

98
    def logout(self):
4✔
99

100
        response = _perform_slo()
4✔
101

102
        if response:
4✔
103
            domain = h.get_site_domain_for_cookie()
4✔
104
            # Clear session cookie in the browser
105
            response.set_cookie('ckan', domain=domain, expires=0)
4✔
106

107
            if not toolkit.check_ckan_version(min_version="2.10"):
4✔
108
                # CKAN <= 2.9.x also sets auth_tkt cookie
UNCOV
109
                response.set_cookie('auth_tkt', domain=domain, expires=0)
×
110

111
        if g.userobj:
4✔
112
            log.info(u'User {0}<{1}> logged out successfully'.format(g.userobj.name, g.userobj.email))
×
113
        else:
114
            log.info(u'No user was logged in!')
4✔
115

116
        return response
4✔
117

118

119
def _perform_slo():
4✔
120

121
    response = None
4✔
122

123
    config = sp_config()
4✔
124
    if config.get('logout_expected_binding') == 'skip-external-logout':
4✔
125
        log.debug('Skipping external logout')
4✔
126
        return
4✔
127

128
    client = h.saml_client(config)
4✔
129
    saml_session_info = get_saml_session_info(session)
4✔
130
    subject_id = get_subject_id(session)
4✔
131

132
    if subject_id is None:
4✔
133
        log.warning(
4✔
134
            'The session does not contain the subject id for user {}'.format(g.user))
135
        return
4✔
136

137
    try:
×
138
        client.users.add_information_about_person(saml_session_info)
×
139
        result = client.global_logout(name_id=subject_id)
×
140
    except (LogoutError, UnsupportedBinding) as e:
×
141
        log.exception(
×
142
            'SLO not supported by IDP: {}'.format(e))
143
        # clear session
144
        return
×
145

146
    if not result:
×
147
        log.error(
×
148
            'Looks like the user {} is not logged in any IdP/AA'.format(subject_id))
149

150
    if len(result) > 1:
×
151
        log.error(
×
152
            'Sorry, I do not know how to logout from several sources.'
153
            ' I will logout just from the first one')
154

155
    for entity_id, logout_info in result.items():
×
156
        if isinstance(logout_info, tuple):
×
157
            binding, http_info = logout_info
×
158
            if binding == entity.BINDING_HTTP_POST:
×
159
                log.debug(
×
160
                    'Returning form to the IdP to continue the logout process')
161
                body = ''.join(http_info['data'])
×
162
                extra_vars = {
×
163
                    u'body': body
164
                }
165
                response = make_response(
×
166
                    base.render(u'saml2auth/idp_logout.html', extra_vars)
167
                )
168

169
            elif binding == entity.BINDING_HTTP_REDIRECT:
×
170
                log.debug(
×
171
                    'Redirecting to the IdP to continue the logout process')
172

173
                response = redirect(h.get_location(http_info), code=302)
×
174
            else:
175
                log.error(
×
176
                    'Failed to log out from Idp. Unknown binding: {}'.format(binding))
177

178
    return response
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc