• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 18344758868

08 Oct 2025 12:32PM UTC coverage: 83.337% (-0.04%) from 83.375%
18344758868

Pull #1053

github

web-flow
Merge 737eb57da into 850788942
Pull Request #1053: fix: run events_sync() in data tasks

1 of 7 new or added lines in 1 file covered. (14.29%)

6 existing lines in 4 files now uncovered.

21666 of 25998 relevant lines covered (83.34%)

1.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.59
/bases/renku_data_services/data_tasks/task_defs.py
1
"""The task definitions in form of coroutines."""
2

3
import asyncio
2✔
4

5
from authzed.api.v1 import (
2✔
6
    Consistency,
7
    LookupResourcesRequest,
8
    ObjectReference,
9
    ReadRelationshipsRequest,
10
    Relationship,
11
    RelationshipFilter,
12
    RelationshipUpdate,
13
    SubjectFilter,
14
    SubjectReference,
15
    WriteRelationshipsRequest,
16
)
17
from ulid import ULID
2✔
18

19
import renku_data_services.authz.admin_sync as admin_sync
2✔
20
import renku_data_services.search.core as search_core
2✔
21
from renku_data_services import errors
2✔
22
from renku_data_services.app_config import logging
2✔
23
from renku_data_services.authz.authz import ResourceType, _AuthzConverter, _Relation
2✔
24
from renku_data_services.authz.models import Scope
2✔
25
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
2✔
26
from renku_data_services.data_tasks.dependencies import DependencyManager
2✔
27
from renku_data_services.data_tasks.taskman import TaskDefininions
2✔
28
from renku_data_services.namespace.models import NamespaceKind
2✔
29
from renku_data_services.solr.solr_client import DefaultSolrClient
2✔
30

31
logger = logging.getLogger(__name__)
2✔
32

33

34
async def update_search(dm: DependencyManager) -> None:
2✔
35
    """Update the SOLR with data from the search staging table."""
36
    while True:
×
37
        async with DefaultSolrClient(dm.config.solr) as client:
×
38
            await search_core.update_solr(dm.search_updates_repo, client, 20)
×
39
        await asyncio.sleep(1)
×
40

41

42
async def send_metrics_to_posthog(dm: DependencyManager) -> None:
2✔
43
    """Send pending product metrics to posthog."""
44
    from posthog import Posthog
×
45

46
    posthog = Posthog(
×
47
        api_key=dm.config.posthog.api_key,
48
        host=dm.config.posthog.host,
49
        sync_mode=True,
50
        super_properties={"environment": dm.config.posthog.environment},
51
    )
52

53
    while True:
×
54
        try:
×
55
            metrics = dm.metrics_repo.get_unprocessed_metrics()
×
56

57
            processed_ids = []
×
58
            async for metric in metrics:
×
59
                try:
×
60
                    posthog.capture(
×
61
                        distinct_id=metric.anonymous_user_id,
62
                        timestamp=metric.timestamp,
63
                        event=metric.event,
64
                        properties=metric.metadata_ or {},
65
                        # This is sent to avoid duplicate events if multiple instances of data service are running.
66
                        # Posthog deduplicates events with the same timestamp, distinct_id, event, and uuid fields:
67
                        # https://github.com/PostHog/posthog/issues/17211#issuecomment-1723136534
68
                        uuid=metric.id.to_uuid4(),
69
                    )
70
                except Exception as e:
×
71
                    logger.error(f"Failed to process metrics event {metric.id}: {e}")
×
72
                else:
73
                    processed_ids.append(metric.id)
×
74

75
            await dm.metrics_repo.delete_processed_metrics(processed_ids)
×
76
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
77
            logger.warning(f"Exiting: {e}")
×
78
            return
×
79
        else:
80
            # NOTE: Sleep 10 seconds between processing cycles
81
            await asyncio.sleep(10)
×
82

83

84
async def generate_user_namespaces(dm: DependencyManager) -> None:
2✔
85
    """Generate namespaces for users if there are none."""
86
    while True:
×
87
        try:
×
88
            await dm.group_repo.generate_user_namespaces()
×
89
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
90
            logger.warning(f"Exiting: {e}")
×
91
        else:
92
            await asyncio.sleep(dm.config.short_task_period_s)
×
93

94

95
async def sync_user_namespaces(dm: DependencyManager) -> None:
2✔
96
    """Lists all user namespaces in the database and adds them to Authzed and the event queue."""
97
    user_namespaces = dm.group_repo._get_user_namespaces()
1✔
98
    logger.info("Start syncing user namespaces to the authorization DB and message queue")
1✔
99
    num_authz: int = 0
1✔
100
    num_events: int = 0
1✔
101
    num_total: int = 0
1✔
102
    async for user_namespace in user_namespaces:
1✔
103
        num_total += 1
1✔
104
        authz_change = dm.authz._add_user_namespace(user_namespace.namespace)
1✔
105
        session = dm.config.db.async_session_maker()
1✔
106
        tx = session.begin()
1✔
107
        await tx.start()
1✔
108
        try:
1✔
109
            await dm.authz.client.WriteRelationships(authz_change.apply)
1✔
110
            num_authz += 1
1✔
111
        except Exception as err:
×
112
            # NOTE: We do not rollback the authz changes here because it is OK if something is in Authz DB
113
            # but not in the message queue but not vice-versa.
114
            logger.error(f"Failed to sync user namespace {user_namespace} because {err}")
×
115
            await tx.rollback()
×
116
        else:
117
            await tx.commit()
1✔
118
        finally:
119
            await session.close()
1✔
120
    logger.info(f"Wrote authorization changes for {num_authz}/{num_total} user namespaces")
1✔
121
    logger.info(f"Wrote to event queue database for {num_events}/{num_total} user namespaces")
1✔
122

123

124
async def bootstrap_user_namespaces(dm: DependencyManager) -> None:
2✔
125
    """Synchronize user namespaces to the authorization database only if none are already present."""
126
    while True:
1✔
127
        try:
1✔
128
            rels = aiter(
1✔
129
                dm.authz.client.ReadRelationships(
130
                    ReadRelationshipsRequest(
131
                        relationship_filter=RelationshipFilter(
132
                            resource_type=ResourceType.user_namespace.value, optional_relation=_Relation.owner.value
133
                        )
134
                    )
135
                )
136
            )
137
            num_rels = 0
1✔
138
            for _ in range(5):
1✔
139
                if await anext(rels, None) is not None:
1✔
140
                    num_rels += 1
×
141
            if num_rels >= 5:
1✔
142
                logger.info(
×
143
                    "Found at least 5 user namespace in the authorization database, "
144
                    "will not sync user namespaces to authorization."
145
                )
146
                return
×
147
            await sync_user_namespaces(dm)
1✔
148
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
149
            logger.warning(f"Exiting: {e}")
×
150
        else:
151
            if dm.config.dummy_stores:
1✔
152
                # only run once in tests
153
                return
1✔
154
            await asyncio.sleep(dm.config.short_task_period_s)
×
155

156

157
async def fix_mismatched_project_namespace_ids(dm: DependencyManager) -> None:
2✔
158
    """Fixes a problem where the project namespace relationship for projects has the wrong group ID."""
159
    while True:
1✔
160
        try:
1✔
161
            api_user = InternalServiceAdmin(id=ServiceAdminId.migrations)
1✔
162
            res = dm.authz.client.ReadRelationships(
1✔
163
                ReadRelationshipsRequest(
164
                    consistency=Consistency(fully_consistent=True),
165
                    relationship_filter=RelationshipFilter(
166
                        resource_type=ResourceType.project,
167
                        optional_relation=_Relation.project_namespace.value,
168
                        optional_subject_filter=SubjectFilter(subject_type=ResourceType.group.value),
169
                    ),
170
                )
171
            )
172
            async for rel in res:
1✔
173
                logger.info(f"Checking project namespace - group relation {rel} for correct group ID")
1✔
174
                project_id = rel.relationship.resource.object_id
1✔
175
                try:
1✔
176
                    project = await dm.project_repo.get_project(api_user, project_id)
1✔
177
                except errors.MissingResourceError:
×
178
                    logger.info(f"Couldn't find project {project_id}, deleting relation")
×
179
                    await dm.authz.client.WriteRelationships(
×
180
                        WriteRelationshipsRequest(
181
                            updates=[
182
                                RelationshipUpdate(
183
                                    operation=RelationshipUpdate.OPERATION_DELETE,
184
                                    relationship=rel.relationship,
185
                                ),
186
                            ]
187
                        )
188
                    )
189
                    continue
×
190

191
                if project.namespace.kind != NamespaceKind.group:
1✔
192
                    continue
×
193
                correct_group_id = project.namespace.underlying_resource_id
1✔
194
                authzed_group_id = rel.relationship.subject.object.object_id
1✔
195
                if authzed_group_id != correct_group_id:
1✔
196
                    logger.info(
1✔
197
                        f"The project namespace ID in Authzed {authzed_group_id} "
198
                        f"does not match the expected group ID {correct_group_id}, correcting it..."
199
                    )
200
                    await dm.authz.client.WriteRelationships(
1✔
201
                        WriteRelationshipsRequest(
202
                            updates=[
203
                                RelationshipUpdate(
204
                                    operation=RelationshipUpdate.OPERATION_TOUCH,
205
                                    relationship=Relationship(
206
                                        resource=rel.relationship.resource,
207
                                        relation=rel.relationship.relation,
208
                                        subject=SubjectReference(
209
                                            object=ObjectReference(
210
                                                object_type=ResourceType.group.value, object_id=str(correct_group_id)
211
                                            )
212
                                        ),
213
                                    ),
214
                                ),
215
                                RelationshipUpdate(
216
                                    operation=RelationshipUpdate.OPERATION_DELETE,
217
                                    relationship=rel.relationship,
218
                                ),
219
                            ]
220
                        )
221
                    )
222
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
223
            logger.warning(f"Exiting: {e}")
×
224
        else:
225
            if dm.config.dummy_stores:
1✔
226
                # only run once in tests
227
                return
1✔
228
            await asyncio.sleep(dm.config.short_task_period_s)
×
229

230

231
async def migrate_groups_make_all_public(dm: DependencyManager) -> None:
2✔
232
    """Update existing groups to make them public."""
233
    while True:
1✔
234
        try:
1✔
235
            all_groups = dm.authz.client.ReadRelationships(
1✔
236
                ReadRelationshipsRequest(
237
                    relationship_filter=RelationshipFilter(
238
                        resource_type=ResourceType.group.value,
239
                        optional_relation=_Relation.group_platform.value,
240
                    )
241
                )
242
            )
243
            all_group_ids: set[str] = set()
1✔
244
            async for group in all_groups:
1✔
245
                all_group_ids.add(group.relationship.resource.object_id)
1✔
246
            logger.info(f"All groups = {len(all_group_ids)}")
1✔
247
            logger.info(f"All groups = {all_group_ids}")
1✔
248

249
            public_groups = dm.authz.client.LookupResources(
1✔
250
                LookupResourcesRequest(
251
                    resource_object_type=ResourceType.group.value,
252
                    permission=Scope.READ.value,
253
                    subject=SubjectReference(object=_AuthzConverter.anonymous_user()),
254
                )
255
            )
256
            public_group_ids: set[str] = set()
1✔
257
            async for group in public_groups:
1✔
258
                public_group_ids.add(group.resource_object_id)
×
259
            logger.info(f"Public groups = {len(public_group_ids)}")
1✔
260
            logger.info(f"Public groups = {public_group_ids}")
1✔
261

262
            groups_to_process = all_group_ids - public_group_ids
1✔
263
            logger.info(f"Groups to process = {groups_to_process}")
1✔
264

265
            all_users = SubjectReference(object=_AuthzConverter.all_users())
1✔
266
            all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
1✔
267
            for group_id in groups_to_process:
1✔
268
                group_res = _AuthzConverter.group(ULID.from_str(group_id))
1✔
269
                all_users_are_viewers = Relationship(
1✔
270
                    resource=group_res,
271
                    relation=_Relation.public_viewer.value,
272
                    subject=all_users,
273
                )
274
                all_anon_users_are_viewers = Relationship(
1✔
275
                    resource=group_res,
276
                    relation=_Relation.public_viewer.value,
277
                    subject=all_anon_users,
278
                )
279
                authz_change = WriteRelationshipsRequest(
1✔
280
                    updates=[
281
                        RelationshipUpdate(operation=RelationshipUpdate.OPERATION_TOUCH, relationship=rel)
282
                        for rel in [all_users_are_viewers, all_anon_users_are_viewers]
283
                    ]
284
                )
285
                await dm.authz.client.WriteRelationships(authz_change)
1✔
286
                logger.info(f"Made group {group_id} public")
1✔
287
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
288
            logger.warning(f"Exiting: {e}")
×
289
        else:
290
            if dm.config.dummy_stores:
1✔
291
                # only run once in tests
292
                return
1✔
293
            await asyncio.sleep(dm.config.short_task_period_s)
×
294

295

296
async def migrate_user_namespaces_make_all_public(dm: DependencyManager) -> None:
2✔
297
    """Update existing user namespaces to make them public."""
298
    while True:
1✔
299
        try:
1✔
300
            all_user_namespaces = dm.authz.client.ReadRelationships(
1✔
301
                ReadRelationshipsRequest(
302
                    relationship_filter=RelationshipFilter(
303
                        resource_type=ResourceType.user_namespace.value,
304
                        optional_relation=_Relation.user_namespace_platform.value,
305
                    )
306
                )
307
            )
308
            all_user_namespace_ids: set[str] = set()
1✔
309
            async for ns in all_user_namespaces:
1✔
310
                all_user_namespace_ids.add(ns.relationship.resource.object_id)
1✔
311
            logger.info(f"All user namespaces = {len(all_user_namespace_ids)}")
1✔
312
            logger.info(f"All user namespaces = {all_user_namespace_ids}")
1✔
313

314
            public_user_namespaces = dm.authz.client.LookupResources(
1✔
315
                LookupResourcesRequest(
316
                    resource_object_type=ResourceType.user_namespace.value,
317
                    permission=Scope.READ.value,
318
                    subject=SubjectReference(object=_AuthzConverter.anonymous_user()),
319
                )
320
            )
321
            public_user_namespace_ids: set[str] = set()
1✔
322
            async for ns in public_user_namespaces:
1✔
323
                public_user_namespace_ids.add(ns.resource_object_id)
×
324
            logger.info(f"Public user namespaces = {len(public_user_namespace_ids)}")
1✔
325
            logger.info(f"Public user namespaces = {public_user_namespace_ids}")
1✔
326

327
            namespaces_to_process = all_user_namespace_ids - public_user_namespace_ids
1✔
328
            logger.info(f"User namespaces to process = {namespaces_to_process}")
1✔
329

330
            all_users = SubjectReference(object=_AuthzConverter.all_users())
1✔
331
            all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
1✔
332
            for ns_id in namespaces_to_process:
1✔
333
                namespace_res = _AuthzConverter.user_namespace(ULID.from_str(ns_id))
1✔
334
                all_users_are_viewers = Relationship(
1✔
335
                    resource=namespace_res,
336
                    relation=_Relation.public_viewer.value,
337
                    subject=all_users,
338
                )
339
                all_anon_users_are_viewers = Relationship(
1✔
340
                    resource=namespace_res,
341
                    relation=_Relation.public_viewer.value,
342
                    subject=all_anon_users,
343
                )
344
                authz_change = WriteRelationshipsRequest(
1✔
345
                    updates=[
346
                        RelationshipUpdate(operation=RelationshipUpdate.OPERATION_TOUCH, relationship=rel)
347
                        for rel in [all_users_are_viewers, all_anon_users_are_viewers]
348
                    ]
349
                )
350
                await dm.authz.client.WriteRelationships(authz_change)
1✔
351
                logger.info(f"Made user namespace {ns_id} public")
1✔
352
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
353
            logger.warning(f"Exiting: {e}")
×
354
        else:
355
            if dm.config.dummy_stores:
1✔
356
                # only run once in tests
357
                return
1✔
358
            await asyncio.sleep(dm.config.short_task_period_s)
×
359

360

361
async def users_sync(dm: DependencyManager) -> None:
2✔
362
    """Sync all users from keycloak."""
363
    while True:
×
364
        try:
×
365
            await dm.syncer.users_sync(dm.kc_api)
×
366

367
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
368
            logger.warning(f"Exiting: {e}")
×
369
        else:
370
            await asyncio.sleep(dm.config.long_task_period_s)
×
371

372

373
async def events_sync(dm: DependencyManager) -> None:
2✔
374
    """Sync users DB from keycloak."""
NEW
375
    while True:
×
NEW
376
        try:
×
NEW
377
            await dm.syncer.events_sync(dm.kc_api)
×
378

NEW
379
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
NEW
380
            logger.warning(f"Exiting: {e}")
×
381
        else:
NEW
382
            await asyncio.sleep(dm.config.long_task_period_s)
×
383

384

385
async def sync_admins_from_keycloak(dm: DependencyManager) -> None:
2✔
386
    """Sync all users from keycloak."""
387
    while True:
×
388
        try:
×
389
            await admin_sync.sync_admins_from_keycloak(dm.kc_api, dm.authz)
×
390

391
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
392
            logger.warning(f"Exiting: {e}")
×
393
        else:
394
            await asyncio.sleep(dm.config.long_task_period_s)
×
395

396

397
def all_tasks(dm: DependencyManager) -> TaskDefininions:
2✔
398
    """A dict of task factories to be managed in main."""
399
    # Impl. note: We pass the entire config to the coroutines, because
400
    # should such a task fail it will be restarted, which means the
401
    # coroutine is re-created. In this case it might be better to also
402
    # re-create its entire state. If we pass already created
403
    # repositories or other services (and they are not stateless) we
404
    # might capture this state and possibly won't recover by
405
    # re-entering the coroutine.
406
    return TaskDefininions(
×
407
        {
408
            "update_search": lambda: update_search(dm),
409
            "send_product_metrics": lambda: send_metrics_to_posthog(dm),
410
            "generate_user_namespace": lambda: generate_user_namespaces(dm),
411
            "bootstrap_user_namespaces": lambda: bootstrap_user_namespaces(dm),
412
            "fix_mismatched_project_namespace_ids": lambda: fix_mismatched_project_namespace_ids(dm),
413
            "migrate_groups_make_all_public": lambda: migrate_groups_make_all_public(dm),
414
            "migrate_user_namespaces_make_all_public": lambda: migrate_user_namespaces_make_all_public(dm),
415
            "users_sync": lambda: users_sync(dm),
416
            "events_sync": lambda: events_sync(dm),
417
            "sync_admins_from_keycloak": lambda: sync_admins_from_keycloak(dm),
418
        }
419
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc