• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine-python / 16367912334

18 Jul 2025 10:06AM UTC coverage: 76.934% (+0.1%) from 76.806%
16367912334

push

github

web-flow
ci: use Ruff as new formatter and linter (#233)

* wip

* pycodestyle

* update dependencies

* skl2onnx

* use ruff

* apply formatter

* apply lint auto fixes

* manually apply lints

* change check

* ruff ci from branch

2805 of 3646 relevant lines covered (76.93%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.07
geoengine/permissions.py
1
"""
2
A wrapper for the GeoEngine permissions API.
3
"""
4

5
from __future__ import annotations
1✔
6

7
import ast
1✔
8
from enum import Enum
1✔
9
from uuid import UUID
1✔
10

11
import geoengine_openapi_client
1✔
12
import geoengine_openapi_client.api
1✔
13
import geoengine_openapi_client.models
1✔
14
import geoengine_openapi_client.models.role
1✔
15

16
from geoengine.auth import get_session
1✔
17
from geoengine.error import GeoEngineException
1✔
18
from geoengine.resource_identifier import Resource
1✔
19

20

21
class RoleId:
1✔
22
    """A wrapper for a role id"""
23

24
    __role_id: UUID
1✔
25

26
    def __init__(self, role_id: UUID) -> None:
1✔
27
        self.__role_id = role_id
1✔
28

29
    @classmethod
1✔
30
    def from_response(cls, response: dict[str, str]) -> RoleId:
1✔
31
        """Parse a http response to an `RoleId`"""
32

33
        if "id" not in response:
×
34
            raise GeoEngineException(response)
×
35

36
        role_id = response["id"]
×
37

38
        return RoleId(UUID(role_id))
×
39

40
    def __eq__(self, other) -> bool:
1✔
41
        """Checks if two role ids are equal"""
42
        if not isinstance(other, self.__class__):
1✔
43
            return False
×
44

45
        return self.__role_id == other.__role_id  # pylint: disable=protected-access
1✔
46

47
    def __str__(self) -> str:
1✔
48
        return str(self.__role_id)
1✔
49

50
    def __repr__(self) -> str:
1✔
51
        return repr(self.__role_id)
×
52

53

54
class Role:
1✔
55
    """A wrapper for a role"""
56

57
    name: str
1✔
58
    id: RoleId
1✔
59

60
    def __init__(self, role_id: UUID | RoleId | str, role_name: str):
1✔
61
        """Create a role with name and id"""
62

63
        if isinstance(role_id, UUID):
1✔
64
            real_id = RoleId(role_id)
×
65
        elif isinstance(role_id, str):
1✔
66
            real_id = RoleId(UUID(role_id))
1✔
67
        else:
68
            real_id = role_id
1✔
69

70
        self.id = real_id
1✔
71
        self.name = role_name
1✔
72

73
    @classmethod
1✔
74
    def from_response(cls, response: geoengine_openapi_client.models.role.Role) -> Role:
1✔
75
        """Parse a http response to an `RoleId`"""
76

77
        role_id = response.id
1✔
78
        role_name = response.name
1✔
79

80
        return Role(role_id, role_name)
1✔
81

82
    def __eq__(self, other) -> bool:
1✔
83
        """Checks if two role ids are equal"""
84
        if not isinstance(other, self.__class__):
1✔
85
            return False
×
86

87
        return self.id == other.id and self.name == other.name
1✔
88

89
    def role_id(self) -> RoleId:
1✔
90
        """get the role id"""
91
        return self.id
×
92

93
    def __repr__(self) -> str:
1✔
94
        return "id: " + repr(self.id) + ", name: " + repr(self.name)
×
95

96

97
class UserId:
1✔
98
    """A wrapper for a role id"""
99

100
    def __init__(self, user_id: UUID) -> None:
1✔
101
        self.__user_id = user_id
×
102

103
    @classmethod
1✔
104
    def from_response(cls, response: dict[str, str]) -> UserId:
1✔
105
        """Parse a http response to an `UserId`"""
106
        print(response)
×
107
        if "id" not in response:
×
108
            raise GeoEngineException(response)
×
109

110
        user_id = response["id"]
×
111

112
        return UserId(UUID(user_id))
×
113

114
    def __eq__(self, other) -> bool:
1✔
115
        """Checks if two role ids are equal"""
116
        if not isinstance(other, self.__class__):
×
117
            return False
×
118

119
        return self.__user_id == other.__user_id  # pylint: disable=protected-access
×
120

121
    def __str__(self) -> str:
1✔
122
        return str(self.__user_id)
×
123

124
    def __repr__(self) -> str:
1✔
125
        return repr(self.__user_id)
×
126

127

128
class PermissionListing:
1✔
129
    """
130
    PermissionListing
131
    """
132

133
    permission: Permission
1✔
134
    resource: Resource
1✔
135
    role: Role
1✔
136

137
    def __init__(self, permission: Permission, resource: Resource, role: Role):
1✔
138
        """Create  a PermissionListing"""
139
        self.permission = permission
1✔
140
        self.resource = resource
1✔
141
        self.role = role
1✔
142

143
    @classmethod
1✔
144
    def from_response(cls, response: geoengine_openapi_client.models.PermissionListing) -> PermissionListing:
1✔
145
        """Transforms a response PermissionListing to a PermissionListing"""
146
        return PermissionListing(
1✔
147
            permission=Permission.from_response(response.permission),
148
            resource=Resource.from_response(response.resource),
149
            role=Role.from_response(response.role),
150
        )
151

152
    def __eq__(self, other) -> bool:
1✔
153
        """Checks if two listings are equal"""
154
        if not isinstance(other, self.__class__):
1✔
155
            return False
×
156
        return self.permission == other.permission and self.resource == other.resource and self.role == other.role
1✔
157

158
    def __repr__(self) -> str:
1✔
159
        return (
×
160
            "Role: "
161
            + repr(self.role)
162
            + ", "
163
            + "Resource: "
164
            + repr(self.resource)
165
            + ", "
166
            + "Permission: "
167
            + repr(self.permission)
168
        )
169

170

171
class Permission(str, Enum):
1✔
172
    """A permission"""
173

174
    READ = "Read"
1✔
175
    OWNER = "Owner"
1✔
176

177
    def to_api_dict(self) -> geoengine_openapi_client.Permission:
1✔
178
        """Convert to a dict for the API"""
179
        return geoengine_openapi_client.Permission(self.value)
1✔
180

181
    @classmethod
1✔
182
    def from_response(cls, response: geoengine_openapi_client.Permission) -> Permission:
1✔
183
        return Permission(response)
1✔
184

185

186
ADMIN_ROLE_ID: RoleId = RoleId(UUID("d5328854-6190-4af9-ad69-4e74b0961ac9"))
1✔
187
REGISTERED_USER_ROLE_ID: RoleId = RoleId(UUID("4e8081b6-8aa6-4275-af0c-2fa2da557d28"))
1✔
188
ANONYMOUS_USER_ROLE_ID: RoleId = RoleId(UUID("fd8e87bf-515c-4f36-8da6-1a53702ff102"))
1✔
189

190

191
def add_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
1✔
192
    """Add a permission to a resource for a role. Requires admin role."""
193

194
    session = get_session()
1✔
195

196
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
1✔
197
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
1✔
198
        permissions_api.add_permission_handler(
1✔
199
            geoengine_openapi_client.PermissionRequest(
200
                role_id=str(role),
201
                resource=resource.to_api_dict(),
202
                permission=permission.to_api_dict(),
203
                _request_timeout=timeout,
204
            )
205
        )
206

207

208
def remove_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
1✔
209
    """Removes a permission to a resource from a role. Requires admin role."""
210

211
    session = get_session()
1✔
212

213
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
1✔
214
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
1✔
215
        permissions_api.remove_permission_handler(
1✔
216
            geoengine_openapi_client.PermissionRequest(
217
                role_id=str(role),
218
                resource=resource.to_api_dict(),
219
                permission=permission.to_api_dict(),
220
                _request_timeout=timeout,
221
            )
222
        )
223

224

225
def list_permissions(resource: Resource, timeout: int = 60, offset=0, limit=20) -> list[PermissionListing]:
1✔
226
    """Lists the roles and permissions assigned to a ressource"""
227

228
    session = get_session()
1✔
229

230
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
1✔
231
        permission_api = geoengine_openapi_client.PermissionsApi(api_client)
1✔
232
        res = permission_api.get_resource_permissions_handler(
1✔
233
            resource_id=resource.id, resource_type=resource.type, offset=offset, limit=limit, _request_timeout=timeout
234
        )
235

236
        return [PermissionListing.from_response(r) for r in res]
1✔
237

238

239
def add_role(name: str, timeout: int = 60) -> RoleId:
1✔
240
    """Add a new role. Requires admin role."""
241

242
    session = get_session()
×
243

244
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
×
245
        user_api = geoengine_openapi_client.UserApi(api_client)
×
246
        response = user_api.add_role_handler(geoengine_openapi_client.AddRole(name=name, _request_timeout=timeout))
×
247

248
    # TODO: find out why JSON string is faulty
249
    # parsed_response = json.loads(response)
250
    parsed_response: dict[str, str] = ast.literal_eval(response)
×
251

252
    return RoleId.from_response(parsed_response)
×
253

254

255
def remove_role(role: RoleId, timeout: int = 60):
1✔
256
    """Remove a role. Requires admin role."""
257

258
    session = get_session()
×
259

260
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
×
261
        user_api = geoengine_openapi_client.UserApi(api_client)
×
262
        user_api.remove_role_handler(str(role), _request_timeout=timeout)
×
263

264

265
def assign_role(role: RoleId, user: UserId, timeout: int = 60):
1✔
266
    """Assign a role to a user. Requires admin role."""
267

268
    session = get_session()
×
269

270
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
×
271
        user_api = geoengine_openapi_client.UserApi(api_client)
×
272
        user_api.assign_role_handler(str(user), str(role), _request_timeout=timeout)
×
273

274

275
def revoke_role(role: RoleId, user: UserId, timeout: int = 60):
1✔
276
    """Assign a role to a user. Requires admin role."""
277

278
    session = get_session()
×
279

280
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
×
281
        user_api = geoengine_openapi_client.UserApi(api_client)
×
282
        user_api.revoke_role_handler(str(user), str(role), _request_timeout=timeout)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc