• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 20161918364

12 Dec 2025 09:11AM UTC coverage: 86.221%. First build
20161918364

Pull #1134

github

web-flow
Merge bf4ca06d1 into 4ab8659ef
Pull Request #1134: build: update posthog to the latest version

0 of 1 new or added line in 1 file covered. (0.0%)

23897 of 27716 relevant lines covered (86.22%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.92
/bases/renku_data_services/data_tasks/task_defs.py
1
"""The task definitions in form of coroutines."""
2

3
import asyncio
2✔
4

5
from authzed.api.v1 import (
2✔
6
    Consistency,
7
    LookupResourcesRequest,
8
    ObjectReference,
9
    ReadRelationshipsRequest,
10
    Relationship,
11
    RelationshipFilter,
12
    RelationshipUpdate,
13
    SubjectFilter,
14
    SubjectReference,
15
    WriteRelationshipsRequest,
16
)
17
from ulid import ULID
2✔
18

19
import renku_data_services.authz.admin_sync as admin_sync
2✔
20
import renku_data_services.search.core as search_core
2✔
21
from renku_data_services import errors
2✔
22
from renku_data_services.app_config import logging
2✔
23
from renku_data_services.authz.authz import ResourceType, _AuthzConverter, _Relation
2✔
24
from renku_data_services.authz.models import Scope
2✔
25
from renku_data_services.base_models.core import InternalServiceAdmin, ServiceAdminId
2✔
26
from renku_data_services.base_models.metrics import MetricsEvent
2✔
27
from renku_data_services.data_tasks.dependencies import DependencyManager
2✔
28
from renku_data_services.data_tasks.taskman import TaskDefininions
2✔
29
from renku_data_services.namespace.models import NamespaceKind
2✔
30
from renku_data_services.solr.solr_client import DefaultSolrClient
2✔
31

32
logger = logging.getLogger(__name__)
2✔
33

34

35
async def update_search(dm: DependencyManager) -> None:
2✔
36
    """Update the SOLR with data from the search staging table."""
37
    while True:
×
38
        async with DefaultSolrClient(dm.config.solr) as client:
×
39
            await search_core.update_solr(dm.search_updates_repo, client, 20)
×
40
        await asyncio.sleep(1)
×
41

42

43
async def send_metrics_to_posthog(dm: DependencyManager) -> None:
2✔
44
    """Send pending product metrics to posthog."""
45
    from posthog import Posthog
×
46

47
    posthog = Posthog(
×
48
        project_api_key=dm.config.posthog.api_key,
49
        host=dm.config.posthog.host,
50
        sync_mode=True,
51
        super_properties={"environment": dm.config.posthog.environment},
52
    )
53

54
    while True:
×
55
        try:
×
56
            metrics = dm.metrics_repo.get_unprocessed_metrics()
×
57

58
            processed_ids = []
×
59
            async for metric in metrics:
×
60
                try:
×
61
                    if metric.event == MetricsEvent.identify_user.value:
×
NEW
62
                        posthog.set(
×
63
                            distinct_id=metric.anonymous_user_id,
64
                            timestamp=metric.timestamp,
65
                            properties=metric.metadata_ or {},
66
                            # This is sent to avoid duplicate events if multiple instances of data service are running.
67
                            # Posthog deduplicates events with the same timestamp, distinct_id, event, and uuid fields:
68
                            # https://github.com/PostHog/posthog/issues/17211#issuecomment-1723136534
69
                            uuid=str(metric.id.to_uuid4()),
70
                        )
71
                    else:
72
                        posthog.capture(
×
73
                            distinct_id=metric.anonymous_user_id,
74
                            timestamp=metric.timestamp,
75
                            event=metric.event,
76
                            properties=metric.metadata_ or {},
77
                            # This is sent to avoid duplicate events if multiple instances of data service are running.
78
                            # Posthog deduplicates events with the same timestamp, distinct_id, event, and uuid fields:
79
                            # https://github.com/PostHog/posthog/issues/17211#issuecomment-1723136534
80
                            uuid=str(metric.id.to_uuid4()),
81
                        )
82
                except Exception as e:
×
83
                    logger.error(f"Failed to process metrics event {metric.id}: {e}")
×
84
                else:
85
                    processed_ids.append(metric.id)
×
86

87
            await dm.metrics_repo.delete_processed_metrics(processed_ids)
×
88
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
89
            logger.warning(f"Exiting: {e}")
×
90
            return
×
91
        else:
92
            # NOTE: Sleep 10 seconds between processing cycles
93
            await asyncio.sleep(10)
×
94

95

96
async def generate_user_namespaces(dm: DependencyManager) -> None:
2✔
97
    """Generate namespaces for users if there are none."""
98
    while True:
×
99
        try:
×
100
            await dm.group_repo.generate_user_namespaces()
×
101
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
102
            logger.warning(f"Exiting: {e}")
×
103
        else:
104
            await asyncio.sleep(dm.config.short_task_period_s)
×
105

106

107
async def sync_user_namespaces(dm: DependencyManager) -> None:
2✔
108
    """Lists all user namespaces in the database and adds them to Authzed and the event queue."""
109
    user_namespaces = dm.group_repo._get_user_namespaces()
1✔
110
    logger.info("Start syncing user namespaces to the authorization DB and message queue")
1✔
111
    num_authz: int = 0
1✔
112
    num_events: int = 0
1✔
113
    num_total: int = 0
1✔
114
    async for user_namespace in user_namespaces:
1✔
115
        num_total += 1
1✔
116
        authz_change = dm.authz._add_user_namespace(user_namespace.namespace)
1✔
117
        session = dm.config.db.async_session_maker()
1✔
118
        tx = session.begin()
1✔
119
        await tx.start()
1✔
120
        try:
1✔
121
            await dm.authz.client.WriteRelationships(authz_change.apply)
1✔
122
            num_authz += 1
1✔
123
        except Exception as err:
×
124
            # NOTE: We do not rollback the authz changes here because it is OK if something is in Authz DB
125
            # but not in the message queue but not vice-versa.
126
            logger.error(f"Failed to sync user namespace {user_namespace} because {err}")
×
127
            await tx.rollback()
×
128
        else:
129
            await tx.commit()
1✔
130
        finally:
131
            await session.close()
1✔
132
    logger.info(f"Wrote authorization changes for {num_authz}/{num_total} user namespaces")
1✔
133
    logger.info(f"Wrote to event queue database for {num_events}/{num_total} user namespaces")
1✔
134

135

136
async def bootstrap_user_namespaces(dm: DependencyManager) -> None:
2✔
137
    """Synchronize user namespaces to the authorization database only if none are already present."""
138
    while True:
1✔
139
        try:
1✔
140
            rels = aiter(
1✔
141
                dm.authz.client.ReadRelationships(
142
                    ReadRelationshipsRequest(
143
                        relationship_filter=RelationshipFilter(
144
                            resource_type=ResourceType.user_namespace.value, optional_relation=_Relation.owner.value
145
                        )
146
                    )
147
                )
148
            )
149
            num_rels = 0
1✔
150
            for _ in range(5):
1✔
151
                if await anext(rels, None) is not None:
1✔
152
                    num_rels += 1
×
153
            if num_rels >= 5:
1✔
154
                logger.info(
×
155
                    "Found at least 5 user namespace in the authorization database, "
156
                    "will not sync user namespaces to authorization."
157
                )
158
                return
×
159
            await sync_user_namespaces(dm)
1✔
160
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
161
            logger.warning(f"Exiting: {e}")
×
162
        else:
163
            if dm.config.dummy_stores:
1✔
164
                # only run once in tests
165
                return
1✔
166
            await asyncio.sleep(dm.config.short_task_period_s)
×
167

168

169
async def fix_mismatched_project_namespace_ids(dm: DependencyManager) -> None:
2✔
170
    """Fixes a problem where the project namespace relationship for projects has the wrong group ID."""
171
    while True:
1✔
172
        try:
1✔
173
            api_user = InternalServiceAdmin(id=ServiceAdminId.migrations)
1✔
174
            res = dm.authz.client.ReadRelationships(
1✔
175
                ReadRelationshipsRequest(
176
                    consistency=Consistency(fully_consistent=True),
177
                    relationship_filter=RelationshipFilter(
178
                        resource_type=ResourceType.project,
179
                        optional_relation=_Relation.project_namespace.value,
180
                        optional_subject_filter=SubjectFilter(subject_type=ResourceType.group.value),
181
                    ),
182
                )
183
            )
184
            async for rel in res:
1✔
185
                logger.info(f"Checking project namespace - group relation {rel} for correct group ID")
1✔
186
                project_id = rel.relationship.resource.object_id
1✔
187
                try:
1✔
188
                    project = await dm.project_repo.get_project(api_user, project_id)
1✔
189
                except errors.MissingResourceError:
×
190
                    logger.info(f"Couldn't find project {project_id}, deleting relation")
×
191
                    await dm.authz.client.WriteRelationships(
×
192
                        WriteRelationshipsRequest(
193
                            updates=[
194
                                RelationshipUpdate(
195
                                    operation=RelationshipUpdate.OPERATION_DELETE,
196
                                    relationship=rel.relationship,
197
                                ),
198
                            ]
199
                        )
200
                    )
201
                    continue
×
202

203
                if project.namespace.kind != NamespaceKind.group:
1✔
204
                    continue
×
205
                correct_group_id = project.namespace.underlying_resource_id
1✔
206
                authzed_group_id = rel.relationship.subject.object.object_id
1✔
207
                if authzed_group_id != correct_group_id:
1✔
208
                    logger.info(
1✔
209
                        f"The project namespace ID in Authzed {authzed_group_id} "
210
                        f"does not match the expected group ID {correct_group_id}, correcting it..."
211
                    )
212
                    await dm.authz.client.WriteRelationships(
1✔
213
                        WriteRelationshipsRequest(
214
                            updates=[
215
                                RelationshipUpdate(
216
                                    operation=RelationshipUpdate.OPERATION_TOUCH,
217
                                    relationship=Relationship(
218
                                        resource=rel.relationship.resource,
219
                                        relation=rel.relationship.relation,
220
                                        subject=SubjectReference(
221
                                            object=ObjectReference(
222
                                                object_type=ResourceType.group.value, object_id=str(correct_group_id)
223
                                            )
224
                                        ),
225
                                    ),
226
                                ),
227
                                RelationshipUpdate(
228
                                    operation=RelationshipUpdate.OPERATION_DELETE,
229
                                    relationship=rel.relationship,
230
                                ),
231
                            ]
232
                        )
233
                    )
234
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
235
            logger.warning(f"Exiting: {e}")
×
236
        else:
237
            if dm.config.dummy_stores:
1✔
238
                # only run once in tests
239
                return
1✔
240
            await asyncio.sleep(dm.config.short_task_period_s)
×
241

242

243
async def migrate_groups_make_all_public(dm: DependencyManager) -> None:
2✔
244
    """Update existing groups to make them public."""
245
    while True:
1✔
246
        try:
1✔
247
            all_groups = dm.authz.client.ReadRelationships(
1✔
248
                ReadRelationshipsRequest(
249
                    relationship_filter=RelationshipFilter(
250
                        resource_type=ResourceType.group.value,
251
                        optional_relation=_Relation.group_platform.value,
252
                    )
253
                )
254
            )
255
            all_group_ids: set[str] = set()
1✔
256
            async for group in all_groups:
1✔
257
                all_group_ids.add(group.relationship.resource.object_id)
1✔
258
            logger.info(f"All groups = {len(all_group_ids)}")
1✔
259
            logger.info(f"All groups = {all_group_ids}")
1✔
260

261
            public_groups = dm.authz.client.LookupResources(
1✔
262
                LookupResourcesRequest(
263
                    resource_object_type=ResourceType.group.value,
264
                    permission=Scope.READ.value,
265
                    subject=SubjectReference(object=_AuthzConverter.anonymous_user()),
266
                )
267
            )
268
            public_group_ids: set[str] = set()
1✔
269
            async for group in public_groups:
1✔
270
                public_group_ids.add(group.resource_object_id)
×
271
            logger.info(f"Public groups = {len(public_group_ids)}")
1✔
272
            logger.info(f"Public groups = {public_group_ids}")
1✔
273

274
            groups_to_process = all_group_ids - public_group_ids
1✔
275
            logger.info(f"Groups to process = {groups_to_process}")
1✔
276

277
            all_users = SubjectReference(object=_AuthzConverter.all_users())
1✔
278
            all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
1✔
279
            for group_id in groups_to_process:
1✔
280
                group_res = _AuthzConverter.group(ULID.from_str(group_id))
1✔
281
                all_users_are_viewers = Relationship(
1✔
282
                    resource=group_res,
283
                    relation=_Relation.public_viewer.value,
284
                    subject=all_users,
285
                )
286
                all_anon_users_are_viewers = Relationship(
1✔
287
                    resource=group_res,
288
                    relation=_Relation.public_viewer.value,
289
                    subject=all_anon_users,
290
                )
291
                authz_change = WriteRelationshipsRequest(
1✔
292
                    updates=[
293
                        RelationshipUpdate(operation=RelationshipUpdate.OPERATION_TOUCH, relationship=rel)
294
                        for rel in [all_users_are_viewers, all_anon_users_are_viewers]
295
                    ]
296
                )
297
                await dm.authz.client.WriteRelationships(authz_change)
1✔
298
                logger.info(f"Made group {group_id} public")
1✔
299
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
300
            logger.warning(f"Exiting: {e}")
×
301
        else:
302
            if dm.config.dummy_stores:
1✔
303
                # only run once in tests
304
                return
1✔
305
            await asyncio.sleep(dm.config.short_task_period_s)
×
306

307

308
async def migrate_user_namespaces_make_all_public(dm: DependencyManager) -> None:
2✔
309
    """Update existing user namespaces to make them public."""
310
    while True:
1✔
311
        try:
1✔
312
            all_user_namespaces = dm.authz.client.ReadRelationships(
1✔
313
                ReadRelationshipsRequest(
314
                    relationship_filter=RelationshipFilter(
315
                        resource_type=ResourceType.user_namespace.value,
316
                        optional_relation=_Relation.user_namespace_platform.value,
317
                    )
318
                )
319
            )
320
            all_user_namespace_ids: set[str] = set()
1✔
321
            async for ns in all_user_namespaces:
1✔
322
                all_user_namespace_ids.add(ns.relationship.resource.object_id)
1✔
323
            logger.info(f"All user namespaces = {len(all_user_namespace_ids)}")
1✔
324
            logger.info(f"All user namespaces = {all_user_namespace_ids}")
1✔
325

326
            public_user_namespaces = dm.authz.client.LookupResources(
1✔
327
                LookupResourcesRequest(
328
                    resource_object_type=ResourceType.user_namespace.value,
329
                    permission=Scope.READ.value,
330
                    subject=SubjectReference(object=_AuthzConverter.anonymous_user()),
331
                )
332
            )
333
            public_user_namespace_ids: set[str] = set()
1✔
334
            async for ns in public_user_namespaces:
1✔
335
                public_user_namespace_ids.add(ns.resource_object_id)
×
336
            logger.info(f"Public user namespaces = {len(public_user_namespace_ids)}")
1✔
337
            logger.info(f"Public user namespaces = {public_user_namespace_ids}")
1✔
338

339
            namespaces_to_process = all_user_namespace_ids - public_user_namespace_ids
1✔
340
            logger.info(f"User namespaces to process = {namespaces_to_process}")
1✔
341

342
            all_users = SubjectReference(object=_AuthzConverter.all_users())
1✔
343
            all_anon_users = SubjectReference(object=_AuthzConverter.anonymous_users())
1✔
344
            for ns_id in namespaces_to_process:
1✔
345
                namespace_res = _AuthzConverter.user_namespace(ULID.from_str(ns_id))
1✔
346
                all_users_are_viewers = Relationship(
1✔
347
                    resource=namespace_res,
348
                    relation=_Relation.public_viewer.value,
349
                    subject=all_users,
350
                )
351
                all_anon_users_are_viewers = Relationship(
1✔
352
                    resource=namespace_res,
353
                    relation=_Relation.public_viewer.value,
354
                    subject=all_anon_users,
355
                )
356
                authz_change = WriteRelationshipsRequest(
1✔
357
                    updates=[
358
                        RelationshipUpdate(operation=RelationshipUpdate.OPERATION_TOUCH, relationship=rel)
359
                        for rel in [all_users_are_viewers, all_anon_users_are_viewers]
360
                    ]
361
                )
362
                await dm.authz.client.WriteRelationships(authz_change)
1✔
363
                logger.info(f"Made user namespace {ns_id} public")
1✔
364
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
365
            logger.warning(f"Exiting: {e}")
×
366
        else:
367
            if dm.config.dummy_stores:
1✔
368
                # only run once in tests
369
                return
1✔
370
            await asyncio.sleep(dm.config.short_task_period_s)
×
371

372

373
async def users_sync(dm: DependencyManager) -> None:
2✔
374
    """Sync all users from keycloak."""
375
    while True:
×
376
        try:
×
377
            await dm.syncer.users_sync(dm.kc_api)
×
378

379
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
380
            logger.warning(f"Exiting: {e}")
×
381
        else:
382
            await asyncio.sleep(dm.config.long_task_period_s)
×
383

384

385
async def sync_admins_from_keycloak(dm: DependencyManager) -> None:
2✔
386
    """Sync all users from keycloak."""
387
    while True:
×
388
        try:
×
389
            await admin_sync.sync_admins_from_keycloak(dm.kc_api, dm.authz)
×
390

391
        except (asyncio.CancelledError, KeyboardInterrupt) as e:
×
392
            logger.warning(f"Exiting: {e}")
×
393
        else:
394
            await asyncio.sleep(dm.config.long_task_period_s)
×
395

396

397
def all_tasks(dm: DependencyManager) -> TaskDefininions:
2✔
398
    """A dict of task factories to be managed in main."""
399
    # Impl. note: We pass the entire config to the coroutines, because
400
    # should such a task fail it will be restarted, which means the
401
    # coroutine is re-created. In this case it might be better to also
402
    # re-create its entire state. If we pass already created
403
    # repositories or other services (and they are not stateless) we
404
    # might capture this state and possibly won't recover by
405
    # re-entering the coroutine.
406
    return TaskDefininions(
×
407
        {
408
            "update_search": lambda: update_search(dm),
409
            "send_product_metrics": lambda: send_metrics_to_posthog(dm),
410
            "generate_user_namespace": lambda: generate_user_namespaces(dm),
411
            "bootstrap_user_namespaces": lambda: bootstrap_user_namespaces(dm),
412
            "fix_mismatched_project_namespace_ids": lambda: fix_mismatched_project_namespace_ids(dm),
413
            "migrate_groups_make_all_public": lambda: migrate_groups_make_all_public(dm),
414
            "migrate_user_namespaces_make_all_public": lambda: migrate_user_namespaces_make_all_public(dm),
415
            "users_sync": lambda: users_sync(dm),
416
            "sync_admins_from_keycloak": lambda: sync_admins_from_keycloak(dm),
417
        }
418
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc