• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 11288123511

11 Oct 2024 07:23AM UTC coverage: 90.667% (+0.2%) from 90.477%
11288123511

Pull #407

github

web-flow
Merge 20c6c8af6 into 5b095d795
Pull Request #407: feat!: add data connectors

1226 of 1325 new or added lines in 28 files covered. (92.53%)

3 existing lines in 3 files now uncovered.

10589 of 11679 relevant lines covered (90.67%)

1.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.11
/bases/renku_data_services/background_jobs/core.py
1
"""Different utility functions for background jobs."""
2✔
2

3
import logging
2✔
4

5
from authzed.api.v1.core_pb2 import ObjectReference, Relationship, RelationshipUpdate, SubjectReference
2✔
6
from authzed.api.v1.permission_service_pb2 import (
2✔
7
    Consistency,
8
    LookupResourcesRequest,
9
    ReadRelationshipsRequest,
10
    RelationshipFilter,
11
    SubjectFilter,
12
    WriteRelationshipsRequest,
13
)
14
from ulid import ULID
2✔
15

16
from renku_data_services.authz.authz import Authz, ResourceType, _AuthzConverter, _Relation
2✔
17
from renku_data_services.authz.models import Scope
2✔
18
from renku_data_services.background_jobs.config import SyncConfig
2✔
19
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
2✔
20
from renku_data_services.errors import errors
2✔
21
from renku_data_services.message_queue.avro_models.io.renku.events import v2
2✔
22
from renku_data_services.message_queue.converters import EventConverter
2✔
23
from renku_data_services.namespace.models import NamespaceKind
2✔
24

25

26
async def sync_user_namespaces(config: SyncConfig) -> None:
2✔
27
    """Lists all user namespaces in the database and adds them to Authzed and the event queue."""
28
    authz = Authz(config.authz_config)
1✔
29
    user_namespaces = config.group_repo._get_user_namespaces()
1✔
30
    logging.info("Start syncing user namespaces to the authorization DB and message queue")
1✔
31
    num_authz: int = 0
1✔
32
    num_events: int = 0
1✔
33
    num_total: int = 0
1✔
34
    async for user_namespace in user_namespaces:
1✔
35
        num_total += 1
1✔
36
        events = EventConverter.to_events(user_namespace, v2.UserAdded)
1✔
37
        authz_change = authz._add_user_namespace(user_namespace.namespace)
1✔
38
        session = config.session_maker()
1✔
39
        tx = session.begin()
1✔
40
        await tx.start()
1✔
41
        try:
1✔
42
            await authz.client.WriteRelationships(authz_change.apply)
1✔
43
            num_authz += 1
1✔
44
            for event in events:
1✔
45
                await config.event_repo.store_event(session, event)
1✔
46
            num_events += 1
1✔
47
        except Exception as err:
×
48
            # NOTE: We do not rollback the authz changes here because it is OK if something is in Authz DB
49
            # but not in the message queue but not vice-versa.
50
            logging.error(f"Failed to sync user namespace {user_namespace} because {err}")
×
51
            await tx.rollback()
×
52
        else:
53
            await tx.commit()
1✔
54
        finally:
55
            await session.close()
1✔
56
    logging.info(f"Wrote authorization changes for {num_authz}/{num_total} user namespaces")
1✔
57
    logging.info(f"Wrote to event queue database for {num_events}/{num_total} user namespaces")
1✔
58

59

60
async def bootstrap_user_namespaces(config: SyncConfig) -> None:
2✔
61
    """Synchronize user namespaces to the authorization database only if none are already present."""
62
    authz = Authz(config.authz_config)
1✔
63
    rels = aiter(
1✔
64
        authz.client.ReadRelationships(
65
            ReadRelationshipsRequest(
66
                relationship_filter=RelationshipFilter(
67
                    resource_type=ResourceType.user_namespace.value, optional_relation=_Relation.owner.value
68
                )
69
            )
70
        )
71
    )
72
    num_rels = 0
1✔
73
    for _ in range(5):
1✔
74
        if await anext(rels, None) is not None:
1✔
75
            num_rels += 1
×
76
    if num_rels >= 5:
1✔
77
        logging.info(
×
78
            "Found at least 5 user namespace in the authorization database, "
79
            "will not sync user namespaces to authorization."
80
        )
81
        return
×
82
    await sync_user_namespaces(config)
1✔
83

84

85
async def fix_mismatched_project_namespace_ids(config: SyncConfig) -> None:
2✔
86
    """Fixes a problem where the project namespace relationship for projects has the wrong group ID."""
87
    api_user = InternalServiceAdmin(id=ServiceAdminId.migrations)
1✔
88
    authz = Authz(config.authz_config)
1✔
89
    res = authz.client.ReadRelationships(
1✔
90
        ReadRelationshipsRequest(
91
            consistency=Consistency(fully_consistent=True),
92
            relationship_filter=RelationshipFilter(
93
                resource_type=ResourceType.project,
94
                optional_relation=_Relation.project_namespace.value,
95
                optional_subject_filter=SubjectFilter(subject_type=ResourceType.group.value),
96
            ),
97
        )
98
    )
99
    async for rel in res:
1✔
100
        logging.info(f"Checking project namespace - group relation {rel} for correct group ID")
1✔
101
        project_id = rel.relationship.resource.object_id
1✔
102
        try:
1✔
103
            project = await config.project_repo.get_project(api_user, project_id)
1✔
104
        except errors.MissingResourceError:
×
105
            logging.info(f"Couldn't find project {project_id}, deleting relation")
×
106
            authz.client.WriteRelationships(
×
107
                WriteRelationshipsRequest(
108
                    updates=[
109
                        RelationshipUpdate(
110
                            operation=RelationshipUpdate.OPERATION_DELETE,
111
                            relationship=rel.relationship,
112
                        ),
113
                    ]
114
                )
115
            )
116
            continue
×
117

118
        if project.namespace.kind != NamespaceKind.group:
1✔
119
            continue
×
120
        correct_group_id = project.namespace.underlying_resource_id
1✔
121
        authzed_group_id = rel.relationship.subject.object.object_id
1✔
122
        if authzed_group_id != correct_group_id:
1✔
123
            logging.info(
1✔
124
                f"The project namespace ID in Authzed {authzed_group_id} "
125
                f"does not match the expected group ID {correct_group_id}, correcting it..."
126
            )
127
            authz.client.WriteRelationships(
1✔
128
                WriteRelationshipsRequest(
129
                    updates=[
130
                        RelationshipUpdate(
131
                            operation=RelationshipUpdate.OPERATION_TOUCH,
132
                            relationship=Relationship(
133
                                resource=rel.relationship.resource,
134
                                relation=rel.relationship.relation,
135
                                subject=SubjectReference(
136
                                    object=ObjectReference(
137
                                        object_type=ResourceType.group.value, object_id=str(correct_group_id)
138
                                    )
139
                                ),
140
                            ),
141
                        ),
142
                        RelationshipUpdate(
143
                            operation=RelationshipUpdate.OPERATION_DELETE,
144
                            relationship=rel.relationship,
145
                        ),
146
                    ]
147
                )
148
            )
149

150

151
async def migrate_groups_make_all_public(config: SyncConfig) -> None:
2✔
152
    """Update existing groups to make them public."""
153
    logger = logging.getLogger("background_jobs").getChild(migrate_groups_make_all_public.__name__)
1✔
154

155
    authz = Authz(config.authz_config)
1✔
156
    all_groups = authz.client.ReadRelationships(
1✔
157
        ReadRelationshipsRequest(
158
            relationship_filter=RelationshipFilter(
159
                resource_type=ResourceType.group.value,
160
                optional_relation=_Relation.group_platform.value,
161
            )
162
        )
163
    )
164
    all_group_ids: set[str] = set()
1✔
165
    async for group in all_groups:
1✔
166
        all_group_ids.add(group.relationship.resource.object_id)
1✔
167
    logger.info(f"All groups = {len(all_group_ids)}")
1✔
168
    logger.info(f"All groups = {all_group_ids}")
1✔
169

170
    public_groups = authz.client.LookupResources(
1✔
171
        LookupResourcesRequest(
172
            resource_object_type=ResourceType.group.value,
173
            permission=Scope.READ.value,
174
            subject=SubjectReference(object=_AuthzConverter.anonymous_user()),
175
        )
176
    )
177
    public_group_ids: set[str] = set()
1✔
178
    async for group in public_groups:
1✔
179
        public_group_ids.add(group.resource_object_id)
×
180
    logger.info(f"Public groups = {len(public_group_ids)}")
1✔
181
    logger.info(f"Public groups = {public_group_ids}")
1✔
182

183
    groups_to_process = all_group_ids - public_group_ids
1✔
184
    logger.info(f"Groups to process = {groups_to_process}")
1✔
185

186
    all_users = SubjectReference(object=_AuthzConverter.all_users())
1✔
187
    all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
1✔
188
    for group_id in groups_to_process:
1✔
189
        group_res = _AuthzConverter.group(ULID.from_str(group_id))
1✔
190
        all_users_are_viewers = Relationship(
1✔
191
            resource=group_res,
192
            relation=_Relation.public_viewer.value,
193
            subject=all_users,
194
        )
195
        all_anon_users_are_viewers = Relationship(
1✔
196
            resource=group_res,
197
            relation=_Relation.public_viewer.value,
198
            subject=all_anon_users,
199
        )
200
        authz_change = WriteRelationshipsRequest(
1✔
201
            updates=[
202
                RelationshipUpdate(operation=RelationshipUpdate.OPERATION_TOUCH, relationship=rel)
203
                for rel in [all_users_are_viewers, all_anon_users_are_viewers]
204
            ]
205
        )
206
        await authz.client.WriteRelationships(authz_change)
1✔
207
        logger.info(f"Made group {group_id} public")
1✔
208

209

210
async def migrate_user_namespaces_make_all_public(config: SyncConfig) -> None:
2✔
211
    """Update existing user namespaces to make them public."""
212
    logger = logging.getLogger("background_jobs").getChild(migrate_user_namespaces_make_all_public.__name__)
1✔
213

214
    authz = Authz(config.authz_config)
1✔
215
    all_user_namespaces = authz.client.ReadRelationships(
1✔
216
        ReadRelationshipsRequest(
217
            relationship_filter=RelationshipFilter(
218
                resource_type=ResourceType.user_namespace.value,
219
                optional_relation=_Relation.user_namespace_platform.value,
220
            )
221
        )
222
    )
223
    all_user_namespace_ids: set[str] = set()
1✔
224
    async for ns in all_user_namespaces:
1✔
225
        all_user_namespace_ids.add(ns.relationship.resource.object_id)
1✔
226
    logger.info(f"All user namespaces = {len(all_user_namespace_ids)}")
1✔
227
    logger.info(f"All user namespaces = {all_user_namespace_ids}")
1✔
228

229
    public_user_namespaces = authz.client.LookupResources(
1✔
230
        LookupResourcesRequest(
231
            resource_object_type=ResourceType.user_namespace.value,
232
            permission=Scope.READ.value,
233
            subject=SubjectReference(object=_AuthzConverter.anonymous_user()),
234
        )
235
    )
236
    public_user_namespace_ids: set[str] = set()
1✔
237
    async for ns in public_user_namespaces:
1✔
238
        public_user_namespace_ids.add(ns.resource_object_id)
×
239
    logger.info(f"Public user namespaces = {len(public_user_namespace_ids)}")
1✔
240
    logger.info(f"Public user namespaces = {public_user_namespace_ids}")
1✔
241

242
    namespaces_to_process = all_user_namespace_ids - public_user_namespace_ids
1✔
243
    logger.info(f"User namespaces to process = {namespaces_to_process}")
1✔
244

245
    all_users = SubjectReference(object=_AuthzConverter.all_users())
1✔
246
    all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
1✔
247
    for ns_id in namespaces_to_process:
1✔
248
        namespace_res = _AuthzConverter.user_namespace(ULID.from_str(ns_id))
1✔
249
        all_users_are_viewers = Relationship(
1✔
250
            resource=namespace_res,
251
            relation=_Relation.public_viewer.value,
252
            subject=all_users,
253
        )
254
        all_anon_users_are_viewers = Relationship(
1✔
255
            resource=namespace_res,
256
            relation=_Relation.public_viewer.value,
257
            subject=all_anon_users,
258
        )
259
        authz_change = WriteRelationshipsRequest(
1✔
260
            updates=[
261
                RelationshipUpdate(operation=RelationshipUpdate.OPERATION_TOUCH, relationship=rel)
262
                for rel in [all_users_are_viewers, all_anon_users_are_viewers]
263
            ]
264
        )
265
        await authz.client.WriteRelationships(authz_change)
1✔
266
        logger.info(f"Made user namespace {ns_id} public")
1✔
267

268

269
async def migrate_storages_v2_to_data_connectors(config: SyncConfig) -> None:
2✔
270
    """Move storages_v2 to data_connectors."""
271
    logger = logging.getLogger("background_jobs").getChild(migrate_storages_v2_to_data_connectors.__name__)
1✔
272

273
    api_user = InternalServiceAdmin(id=ServiceAdminId.migrations)
1✔
274
    storages_v2 = await config.data_connector_migration_tool.get_storages_v2(requested_by=api_user)
1✔
275

276
    if not storages_v2:
1✔
NEW
277
        logger.info("Nothing to do.")
×
NEW
278
        return
×
279

280
    logger.info(f"Migrating {len(storages_v2)} cloud storage v2 items to data connectors.")
1✔
281
    failed_storages: list[str] = []
1✔
282
    for storage in storages_v2:
1✔
283
        try:
1✔
284
            data_connector = await config.data_connector_migration_tool.migrate_storage_v2(
1✔
285
                requested_by=api_user, storage=storage
286
            )
287
            logger.info(f"Migrated {storage.name} to {data_connector.namespace.slug}/{data_connector.slug}.")
1✔
288
            logger.info(f"Deleted storage_v2: {storage.storage_id}")
1✔
NEW
289
        except Exception as err:
×
NEW
290
            logger.error(f"Failed to migrate {storage.name}.")
×
NEW
291
            logger.error(err)
×
NEW
292
            failed_storages.append(str(storage.storage_id))
×
293

294
    logger.info(f"Migrated {len(storages_v2)-len(failed_storages)}/{len(storages_v2)} data connectors.")
1✔
295
    if failed_storages:
1✔
NEW
296
        logger.error(f"Migration failed for storages: {failed_storages}.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc