• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 4145649460

pending completion
4145649460

push

github-actions

GitHub
Merge branch 'develop' into allow-ref-target-for-release-action

25096 of 28903 relevant lines covered (86.83%)

4.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.11
/renku/ui/service/worker.py
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2020 - Swiss Data Science Center (SDSC)
4
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
5
# Eidgenössische Technische Hochschule Zürich (ETHZ).
6
#
7
# Licensed under the Apache License, Version 2.0 (the "License");
8
# you may not use this file except in compliance with the License.
9
# You may obtain a copy of the License at
10
#
11
#     http://www.apache.org/licenses/LICENSE-2.0
12
#
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS,
15
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
# See the License for the specific language governing permissions and
17
# limitations under the License.
18
"""Renku service worker."""
2✔
19
import os
2✔
20
from contextlib import contextmanager
2✔
21

22
import sentry_sdk
2✔
23
from rq import Worker
2✔
24
from sentry_sdk.integrations.rq import RqIntegration
2✔
25

26
from renku.core.errors import ConfigurationError, UsageError
2✔
27
from renku.ui.service.config import SENTRY_ENABLED, SENTRY_SAMPLERATE
2✔
28
from renku.ui.service.jobs.queues import QUEUES, WorkerQueues
2✔
29
from renku.ui.service.logger import DEPLOYMENT_LOG_LEVEL, worker_log
2✔
30

31
if SENTRY_ENABLED:
2✔
32
    sentry_sdk.init(
×
33
        dsn=os.getenv("SENTRY_DSN"),
34
        environment=os.getenv("SENTRY_ENV"),
35
        traces_sample_rate=float(SENTRY_SAMPLERATE),
36
        integrations=[RqIntegration()],
37
    )
38

39

40
@contextmanager
2✔
41
def worker(queue_list):
2✔
42
    """Creates worker object."""
43

44
    def build_worker():
×
45
        """Build worker."""
46
        # NOTE: logging configuration has been moved to `.work(logging_level=)`
47
        worker_log.info(f"worker log level set to {DEPLOYMENT_LOG_LEVEL}")
×
48

49
        rq_worker = Worker(queue_list, connection=WorkerQueues.connection, log_job_description=False)
×
50
        worker_log.info("worker created")
×
51

52
        return rq_worker
×
53

54
    yield build_worker()
×
55

56

57
def check_queues(queue_list):
2✔
58
    """Check if listening on specified queues is possible."""
59
    for queue in queue_list:
×
60
        if queue and queue not in QUEUES:
×
61
            err_msg = "Invalid queue name: {0}\n\n" "Valid queue names: \n{1}".format(queue, "\n".join(QUEUES))
×
62
            raise UsageError(err_msg)
×
63

64

65
def start_worker(queue_list):
2✔
66
    """Start worker."""
67
    q = [q.strip() for q in queue_list if q.strip()]
×
68
    check_queues(q)
×
69

70
    worker_log.info(f"working on queues: {queue_list}")
×
71

72
    with worker(queue_list) as rq_worker:
×
73
        worker_log.info("running worker")
×
74
        rq_worker.work(logging_level=DEPLOYMENT_LOG_LEVEL)
×
75

76

77
if __name__ == "__main__":
2✔
78
    queues = os.getenv("RENKU_SVC_WORKER_QUEUES")
×
79
    worker_log.info(f"working on queues: {queues}")
×
80

81
    if not queues:
×
82
        raise ConfigurationError(
×
83
            "Worker queues not specified. Please, set RENKU_SVC_WORKER_QUEUES environment variable."
84
        )
85

86
    start_worker([queue_name.strip() for queue_name in queues.strip().split(",")])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc