• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dataflake / Products.LDAPMultiPlugins / 4066443713

pending completion
4066443713

push

github

GitHub
Merge pull request #1 from dataflake/config-with-zope-product-template-af52a452

25 of 238 branches covered (10.5%)

Branch coverage included in aggregate %.

7 of 7 new or added lines in 3 files covered. (100.0%)

388 of 763 relevant lines covered (50.85%)

0.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.35
/src/Products/LDAPMultiPlugins/LDAPMultiPlugin.py
1
##############################################################################
2
#
3
# Copyright (c) 2005-2021 Jens Vagelpohl and Contributors. All Rights Reserved.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
""" LDAPMultiPlugin, a LDAP-enabled PluggableAuthService plugin
1✔
14
"""
15

16
import logging
1✔
17
import os
1✔
18
from urllib.parse import quote_plus
1✔
19

20
from AccessControl import ClassSecurityInfo
1✔
21
from AccessControl.class_init import InitializeClass
1✔
22
from Acquisition import aq_base
1✔
23
from App.Common import package_home
1✔
24
from App.special_dtml import DTMLFile
1✔
25
from zope.interface import implementedBy
1✔
26

27
from Products.LDAPUserFolder import manage_addLDAPUserFolder
1✔
28
from Products.PluggableAuthService.interfaces.plugins import \
1✔
29
    IGroupEnumerationPlugin
30
from Products.PluggableAuthService.interfaces.plugins import IGroupsPlugin
1✔
31
from Products.PluggableAuthService.interfaces.plugins import \
1✔
32
    IRoleEnumerationPlugin
33
from Products.PluggableAuthService.interfaces.plugins import \
1✔
34
    IUserEnumerationPlugin
35
from Products.PluggableAuthService.utils import classImplements
1✔
36

37
from .LDAPPluginBase import LDAPPluginBase
1✔
38

39

40
logger = logging.getLogger('event.LDAPMultiPlugin')
1✔
41
_dtmldir = os.path.join(package_home(globals()), 'dtml')
1✔
42
addLDAPMultiPluginForm = DTMLFile('addLDAPMultiPlugin', _dtmldir)
1✔
43

44

45
def manage_addLDAPMultiPlugin(self, id, title, REQUEST=None):
1✔
46
    """ Factory method to instantiate a LDAPMultiPlugin """
47
    # Make sure we really are working in our container (the
48
    # PluggableAuthService object)
49
    self = self.this()
×
50

51
    # Instantiate the folderish adapter object
52
    self._setObject(id, LDAPMultiPlugin(id, title=title))
×
53
    lmp = getattr(aq_base(self), id)
×
54

55
    # Put the "real" LDAPUserFolder inside it
56
    manage_addLDAPUserFolder(lmp)
×
57

58
    # clean out the __allow_groups__ bit because it is not needed here
59
    # and potentially harmful
60
    lmp_base = aq_base(lmp)
×
61
    if hasattr(lmp_base, '__allow_groups__'):
×
62
        del lmp_base.__allow_groups__
×
63

64
    if REQUEST is not None:
×
65
        REQUEST.RESPONSE.redirect('%s/manage_main' % self.absolute_url())
×
66

67

68
class LDAPMultiPlugin(LDAPPluginBase):
1✔
69
    """ The adapter that mediates between the PAS and the LDAPUserFolder """
70
    security = ClassSecurityInfo()
1✔
71
    meta_type = 'LDAP Multi Plugin'
1✔
72
    zmi_icon = 'fas fa-address-book'
1✔
73

74
    @security.private
1✔
75
    def getGroupsForPrincipal(self, user, request=None, attr=None):
1✔
76
        """ Fulfill GroupsPlugin requirements """
77
        view_name = self.getId() + '_getGroupsForPrincipal'
×
78
        criteria = {'id': user.getId(), 'attr': attr}
×
79

80
        cached_info = self.ZCacheable_get(view_name=view_name,
×
81
                                          keywords=criteria,
82
                                          default=None)
83

84
        if cached_info is not None:
×
85
            logger.debug('returning cached results from enumerateUsers')
×
86
            return cached_info
×
87

88
        acl = self._getLDAPUserFolder()
×
89

90
        if acl is None:
×
91
            return ()
×
92

93
        unmangled_userid = self._demangle(user.getId())
×
94
        if unmangled_userid is None:
×
95
            return ()
×
96

97
        ldap_user = acl.getUserById(unmangled_userid)
×
98

99
        if ldap_user is None:
×
100
            return ()
×
101

102
        groups = acl.getGroups(ldap_user.getUserDN(), attr=attr)
×
103

104
        result = tuple(x[0] for x in groups)
×
105
        self.ZCacheable_set(result, view_name=view_name, keywords=criteria)
×
106

107
        return result
×
108

109
    @security.private
1✔
110
    def enumerateUsers(self, id=None, login=None, exact_match=0, sort_by=None,
1✔
111
                       max_results=None, **kw):
112
        """ Fulfill the UserEnumerationPlugin requirements """
113
        view_name = self.getId() + '_enumerateUsers'
×
114
        criteria = {'id': id, 'login': login, 'exact_match': exact_match,
×
115
                    'sort_by': sort_by, 'max_results': max_results}
116
        criteria.update(kw)
×
117

118
        cached_info = self.ZCacheable_get(view_name=view_name,
×
119
                                          keywords=criteria,
120
                                          default=None)
121

122
        if cached_info is not None:
×
123
            logger.debug('returning cached results from enumerateUsers')
×
124
            return cached_info
×
125

126
        result = []
×
127
        acl = self._getLDAPUserFolder()
×
128
        login_attr = acl.getProperty('_login_attr')
×
129
        uid_attr = acl.getProperty('_uid_attr')
×
130
        rdn_attr = acl.getProperty('_rdnattr')
×
131
        plugin_id = self.getId()
×
132
        edit_url = f'{plugin_id}/{acl.getId()}/manage_userrecords'
×
133

134
        if acl is None:
×
135
            return ()
×
136

137
        if exact_match and (id or login):
×
138
            if id:
×
139
                ldap_user = acl.getUserById(id)
×
140
                if ldap_user is not None and ldap_user.getId() != id:
×
141
                    ldap_user = None
×
142
            elif login:
×
143
                ldap_user = acl.getUser(login)
×
144
                if ldap_user is not None and ldap_user.getUserName() != login:
×
145
                    ldap_user = None
×
146

147
            if ldap_user is not None:
×
148
                qs = 'user_dn=%s' % quote_plus(ldap_user.getUserDN())
×
149
                result.append({'id': ldap_user.getId(),
×
150
                               'login': ldap_user.getProperty(login_attr),
151
                               'pluginid': plugin_id,
152
                               'editurl': f'{edit_url}?{qs}'})
153
        else:
154
            l_results = []
×
155
            seen = []
×
156
            ldap_criteria = {}
×
157

158
            if id:
×
159
                if uid_attr == 'dn':
×
160
                    # Workaround: Due to the way findUser reacts when a DN
161
                    # is searched for I need to hack around it... This
162
                    # limits the usefulness of searching by ID if the user
163
                    # folder uses the full DN aas user ID.
164
                    ldap_criteria[rdn_attr] = id
×
165
                else:
166
                    ldap_criteria[uid_attr] = id
×
167

168
            if login:
×
169
                ldap_criteria[login_attr] = login
×
170

171
            for key, val in kw.items():
×
172
                if key not in (login_attr, uid_attr):
×
173
                    ldap_criteria[key] = val
×
174

175
            # If no criteria are given create a criteria set that will
176
            # return all users
177
            if not login and not id:
×
178
                ldap_criteria[login_attr] = ''
×
179

180
            l_results = acl.searchUsers(exact_match=exact_match,
×
181
                                        **ldap_criteria)
182

183
            for l_res in l_results:
×
184

185
                # If the LDAPUserFolder returns an error, bail
186
                if l_res.get('sn', '') == 'Error' and \
×
187
                   l_res.get('cn', '') == 'n/a':
188
                    return ()
×
189

190
                if l_res['dn'] not in seen:
×
191
                    l_res['id'] = l_res[uid_attr]
×
192
                    l_res['login'] = l_res[login_attr]
×
193
                    l_res['pluginid'] = plugin_id
×
194
                    quoted_dn = quote_plus(l_res['dn'])
×
195
                    l_res['editurl'] = f'{edit_url}?user_dn={quoted_dn}'
×
196
                    result.append(l_res)
×
197
                    seen.append(l_res['dn'])
×
198

199
            if sort_by is not None:
×
200
                result.sort(key=lambda item: item.get(sort_by, '').lower())
×
201

202
            if isinstance(max_results, int) and len(result) > max_results:
×
203
                result = result[:max_results-1]
×
204

205
        result = tuple(result)
×
206
        self.ZCacheable_set(result, view_name=view_name, keywords=criteria)
×
207

208
        return result
×
209

210
    @security.private
1✔
211
    def enumerateGroups(self, id=None, exact_match=False, sort_by=None,
1✔
212
                        max_results=None, **kw):
213
        """ Fulfill the GroupEnumerationPlugin requirements """
214
        view_name = self.getId() + '_enumerateGroups'
×
215
        criteria = {'id': id, 'exact_match': exact_match,
×
216
                    'sort_by': sort_by, 'max_results': max_results}
217
        criteria.update(kw)
×
218

219
        cached_info = self.ZCacheable_get(view_name=view_name,
×
220
                                          keywords=criteria,
221
                                          default=None)
222

223
        if cached_info is not None:
×
224
            logger.debug('returning cached results from enumerateGroups')
×
225
            return cached_info
×
226

227
        acl = self._getLDAPUserFolder()
×
228

229
        if acl is None:
×
230
            return ()
×
231

232
        if (id is not None and not exact_match and not kw):
×
233
            # likely from a PAS.getUserById(). In any case 'id' and
234
            # 'exact_match' means only a single result should be
235
            # available so try to fetch specific group info from
236
            # cache.
237
            group_info = self._getGroupInfoCache(id)
×
238
            if group_info is not None:
×
239
                return (group_info,)
×
240

241
        if id is None and exact_match:
×
242
            raise ValueError('Exact Match requested but no id provided')
×
243
        elif id is not None:
×
244
            kw[self.groupid_attr] = id
×
245

246
        plugin_id = self.getId()
×
247

248
        results = acl.searchGroups(exact_match=exact_match, **kw)
×
249

250
        if len(results) == 1 and results[0]['cn'] == 'n/a':
×
251
            # we didn't give enough known criteria for searches
252
            return ()
×
253

254
        if isinstance(max_results, int) and len(results) > max_results:
×
255
            results = results[:max_results+1]
×
256

257
        for rec in results:
×
258
            rec['pluginid'] = plugin_id
×
259
            rec['id'] = rec[self.groupid_attr]
×
260
            self._setGroupInfoCache(rec)
×
261

262
        results = tuple(results)
×
263
        self.ZCacheable_set(results, view_name=view_name, keywords=criteria)
×
264

265
        return results
×
266

267
    @security.private
1✔
268
    def enumerateRoles(self, id=None, exact_match=0, sort_by=None,
1✔
269
                       max_results=None, **kw):
270
        """ Fulfill the RoleEnumerationPlugin requirements """
271
        # For LDAP, roles and groups are really one and the same thing.
272
        # We can simply call enumerateGroups here.
273
        return self.enumerateGroups(id=id, exact_match=exact_match,
×
274
                                    sort_by=sort_by, max_results=max_results,
275
                                    **kw)
276

277

278
classImplements(LDAPMultiPlugin,
1✔
279
                IUserEnumerationPlugin,
280
                IGroupsPlugin,
281
                IGroupEnumerationPlugin,
282
                IRoleEnumerationPlugin,
283
                *implementedBy(LDAPPluginBase))
284

285
InitializeClass(LDAPMultiPlugin)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc