• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 5529030370

pending completion
5529030370

push

github-actions

Ralf Grubenmann
fix docker build, pin versions

24252 of 28479 relevant lines covered (85.16%)

2.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.47
/renku/ui/cli/service.py
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2021 - Swiss Data Science Center (SDSC)
4
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
5
# Eidgenössische Technische Hochschule Zürich (ETHZ).
6
#
7
# Licensed under the Apache License, Version 2.0 (the "License");
8
# you may not use this file except in compliance with the License.
9
# You may obtain a copy of the License at
10
#
11
#     http://www.apache.org/licenses/LICENSE-2.0
12
#
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS,
15
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
# See the License for the specific language governing permissions and
17
# limitations under the License.
18
"""Commands to launch service components."""
6✔
19
import os
6✔
20
import signal
6✔
21
import subprocess
6✔
22
import sys
6✔
23
import tempfile
6✔
24
import time
6✔
25
from datetime import datetime
6✔
26
from pathlib import Path
6✔
27

28
import click
6✔
29
import psutil
6✔
30

31
import renku.ui.cli.utils.color as color
6✔
32
from renku.command.util import ERROR
6✔
33

34
RENKU_DAEMON_LOG_FILE = "renku.log"
6✔
35
RENKU_DAEMON_ERR_FILE = "renku.err"
6✔
36

37
SERVICE_COMPONENT_TAGS = ["api", "scheduler", "worker"]
6✔
38

39

40
def run_api(addr="0.0.0.0", port=8080, timeout=600):
6✔
41
    """Run service JSON-RPC API."""
42
    from gunicorn.app.wsgiapp import run
×
43

44
    svc_num_workers = os.getenv("RENKU_SVC_NUM_WORKERS", "1")
×
45
    svc_num_threads = os.getenv("RENKU_SVC_NUM_THREADS", "2")
×
46

47
    svc_timeout = int(os.getenv("REQUEST_TIMEOUT", timeout))
×
48

49
    loading_opt = "--preload"
×
50

51
    sys.argv = [
×
52
        "gunicorn",
53
        "renku.ui.service.entrypoint:app",
54
        loading_opt,
55
        "-b",
56
        f"{addr}:{port}",
57
        "--timeout",
58
        f"{svc_timeout}",
59
        "--workers",
60
        svc_num_workers,
61
        "--worker-class",
62
        "gthread",
63
        "--threads",
64
        svc_num_threads,
65
        "--log-level",
66
        "debug",
67
    ]
68

69
    sys.exit(run())
×
70

71

72
def run_worker(queues):
6✔
73
    """Run service workers."""
74
    from renku.ui.service.jobs.queues import QUEUES
×
75
    from renku.ui.service.worker import start_worker
×
76

77
    if not queues:
×
78
        queues = os.getenv("RENKU_SVC_WORKER_QUEUES", "")
×
79
        queues = [queue_name.strip() for queue_name in queues.strip().split(",") if queue_name.strip()]
×
80

81
        if not queues:
×
82
            queues = QUEUES
×
83

84
    start_worker(queues)
×
85

86

87
def check_cmdline(cmdline, include=None):
6✔
88
    """Check `cmdline` command of a process."""
89
    include = include or []
×
90
    service_components = include + SERVICE_COMPONENT_TAGS
×
91

92
    for cmd in service_components:
×
93
        if cmd in cmdline:
×
94
            return True
×
95

96
    return False
×
97

98

99
def is_renku_process(process, include):
6✔
100
    """Return true if this is a renku process."""
101
    process_name = process.name().lower()
×
102

103
    if process_name == "renku" and check_cmdline(process.cmdline(), include):
×
104
        return True
×
105
    elif "python" not in process_name:
×
106
        return False
×
107

108
    try:
×
109
        command_line = process.cmdline()
×
110
        if not check_cmdline(command_line, include):
×
111
            return False
×
112

113
        for line in command_line:
×
114
            if line.endswith("renku"):
×
115
                return True
×
116

117
    except (psutil.AccessDenied, psutil.NoSuchProcess):
×
118
        pass
×
119

120
    return False
×
121

122

123
def list_renku_processes(include=None):
6✔
124
    """List renku processes."""
125
    include = include or []
×
126

127
    renku_processes_all = []
×
128
    for pid in sorted(psutil.pids()):
×
129
        try:
×
130
            proc = psutil.Process(pid)
×
131

132
            if is_renku_process(proc, include) and proc.status() != "zombie":
×
133
                renku_processes_all.append(proc)
×
134

135
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
×
136
            continue
×
137

138
    renku_proc_info = sorted(
×
139
        [
140
            {
141
                "create_time": datetime.fromtimestamp(proc.create_time()).strftime("%d.%b %H:%M"),
142
                "pid": proc.pid,
143
                "cmdline": f"renku {' '.join(proc.cmdline()[2:])}",
144
                "status": proc.status(),
145
                "mem_perct": proc.memory_percent(),
146
                "cpu_perct": proc.cpu_percent(),
147
                "num_threads": proc.num_threads(),
148
            }
149
            for proc in renku_processes_all
150
        ],
151
        key=lambda k: k["cmdline"],
152
    )
153

154
    return renku_proc_info
×
155

156

157
def read_logs(log_file, follow=True, output_all=False):
6✔
158
    """Read logs file. Supports following logs in realtime."""
159
    if follow and not output_all:
×
160
        log_file.seek(0, os.SEEK_END)
×
161

162
    while True:
163
        line = log_file.readline()
×
164
        if not line and follow:
×
165
            time.sleep(0.1)
×
166
            continue
×
167

168
        if not line and not follow:
×
169
            return
×
170

171
        yield line
×
172

173

174
@click.group()
6✔
175
@click.option("-e", "--env", default=None, type=click.Path(exists=True, dir_okay=False), help="Path to the .env file.")
6✔
176
@click.pass_context
6✔
177
def service(ctx, env):
6✔
178
    """Manage service components."""
179
    import redis
×
180
    import rq  # noqa: F401
×
181
    from dotenv import load_dotenv
×
182

183
    try:
×
184
        from renku.ui.service.cache.base import BaseCache
×
185

186
        BaseCache.cache.ping()
×
187

188
        load_dotenv(dotenv_path=env)
×
189
    except ImportError:
×
190
        # NOTE: Service dependency is missing.
191

192
        click.echo(
×
193
            ERROR + "Dependency not found! "
194
            "Please install `pip install renku[service]` to enable service component control."
195
        )
196

197
        ctx.exit(1)
×
198

199
    except redis.exceptions.ConnectionError as e:
×
200
        # NOTE: Cannot connect to the service dependencies, ie. Redis.
201
        click.echo(ERROR + f"Cannot connect to Redis: {e}")
×
202

203
        ctx.exit(1)
×
204

205

206
@service.command(name="api")
6✔
207
@click.option(
6✔
208
    "-a",
209
    "--addr",
210
    type=str,
211
    default="0.0.0.0",
212
    show_default=True,
213
    help="Address on which API service should listen to. By default uses IPv4.",
214
)
215
@click.option(
6✔
216
    "-p",
217
    "--port",
218
    type=int,
219
    default=8080,
220
    show_default=True,
221
    help="Port on which API service should listen to. Avoid ports below 1024, for those use reverse-proxies.",
222
)
223
@click.option(
6✔
224
    "-t",
225
    "--timeout",
226
    type=int,
227
    default=600,
228
    show_default=True,
229
    help="Request silent for more than this many seconds are dropped.",
230
)
231
def api_start(addr, port, timeout):
6✔
232
    """Start service JSON-RPC API in active shell session."""
233
    run_api(addr, port, timeout)
×
234

235

236
@service.command(name="scheduler")
6✔
237
def scheduler_start():
6✔
238
    """Start service scheduler in active shell session."""
239
    from renku.ui.service.scheduler import start_scheduler
×
240

241
    start_scheduler()
×
242

243

244
@service.command(name="worker")
6✔
245
@click.option("-q", "--queue", multiple=True)
6✔
246
def worker_start(queue):
6✔
247
    """Start service worker in active shell session. By default it listens on all queues."""
248
    run_worker([q.strip() for q in queue if q])
×
249

250

251
@service.command(name="ps")
6✔
252
@click.pass_context
6✔
253
def ps(ctx):
6✔
254
    """Check status of running services."""
255
    from renku.core.util.tabulate import tabulate
×
256

257
    processes = list_renku_processes()
×
258
    headers = [{k.upper(): v for k, v in rec.items()} for rec in processes]
×
259

260
    output = tabulate(processes, headers=headers)
×
261

262
    if not processes:
×
263
        click.echo("Renku service components are down.")
×
264
        ctx.exit()
×
265

266
    click.echo(output)
×
267

268

269
@service.command(name="up")
6✔
270
@click.option("-d", "--daemon", is_flag=True, default=False, help="Starts all processes in daemon mode.")
6✔
271
@click.option("-rd", "--runtime-dir", default=".", help="Directory for runtime metadata in daemon mode.")
6✔
272
@click.pass_context
6✔
273
def all_start(ctx, daemon, runtime_dir):
6✔
274
    """Start all service components."""
275
    from circus import get_arbiter
×
276

277
    from renku.core.util.contexts import chdir
×
278

279
    services = [
×
280
        {
281
            "name": "RenkuCoreService",
282
            "cmd": "renku",
283
            "args": ["service", "api"],
284
            "numprocesses": 1,
285
            "env": os.environ.copy(),
286
            "shell": True,
287
        },
288
        {
289
            "name": "RenkuCoreScheduler",
290
            "cmd": "renku",
291
            "args": ["service", "scheduler"],
292
            "numprocesses": 1,
293
            "env": os.environ.copy(),
294
            "shell": True,
295
        },
296
        {
297
            "name": "RenkuCoreWorker",
298
            "cmd": "renku",
299
            "args": ["service", "worker"],
300
            "numprocesses": 1,
301
            "env": os.environ.copy(),
302
            "shell": True,
303
        },
304
    ]
305

306
    def launch_arbiter(arbiter):
×
307
        """Helper for launching arbiter process."""
308
        with chdir(runtime_dir):
×
309
            try:
×
310
                arbiter.start()
×
311
            finally:
312
                arbiter.stop()
×
313

314
    if not daemon:
×
315
        launch_arbiter(get_arbiter(services))
×
316
        ctx.exit()
×
317

318
    # NOTE: If we are running in daemon mode, the runtime directory is generated is OS /tmp directory.
319
    # Since in this case daemon is long running process we don't want to pollute user space.
320
    if not runtime_dir or runtime_dir == ".":
×
321
        runtime_dir = tempfile.mkdtemp()
×
322

323
    os.environ["CACHE_DIR"] = runtime_dir
×
324
    click.echo(f"Using runtime directory: {runtime_dir}")
×
325

326
    log_stdout = Path(runtime_dir) / RENKU_DAEMON_LOG_FILE
×
327
    log_stderr = Path(runtime_dir) / RENKU_DAEMON_ERR_FILE
×
328

329
    subprocess.Popen(
×
330
        ["renku", "service", "up", "--runtime-dir", runtime_dir],
331
        stdout=log_stdout.open(mode="w"),
332
        stderr=log_stderr.open(mode="w"),
333
        start_new_session=True,
334
    )
335

336
    click.secho("OK", fg=color.GREEN)
×
337

338

339
@service.command(name="down")
6✔
340
def all_stop():
6✔
341
    """Stop all service components."""
342
    # NOTE: We include `renku service up` because that process contains the arbiter and watcher.
343
    processes = list_renku_processes(["up"])
×
344

345
    for proc in processes:
×
346
        click.echo(f"Shutting down [{proc['pid']}] `{proc['cmdline']}`")
×
347
        try:
×
348
            os.kill(proc["pid"], signal.SIGKILL)
×
349
        except ProcessLookupError:
×
350
            click.echo(f"Process [{proc['pid']}] `{proc['cmdline']}` not found - skipping")
×
351
            continue
×
352

353
    if processes:
×
354
        click.secho("OK", fg=color.GREEN)
×
355
    else:
356
        click.echo("Nothing to shut down.")
×
357

358

359
@service.command(name="restart")
6✔
360
def all_restart():
6✔
361
    """Restart all running service components."""
362
    processes = list_renku_processes()
×
363

364
    for proc in processes:
×
365
        click.echo(f"Restarting `{proc['cmdline']}`")
×
366
        os.kill(proc["pid"], signal.SIGKILL)
×
367

368
    if processes:
×
369
        click.secho("OK", fg=color.GREEN)
×
370
    else:
371
        click.echo("Nothing to restart.")
×
372

373

374
@service.command(name="logs")
6✔
375
@click.option("-f", "--follow", is_flag=True, default=False, help="Follows logs of damonized service components.")
6✔
376
@click.option(
6✔
377
    "-a", "--output-all", is_flag=True, default=False, help="Outputs ALL logs of damonized service components."
378
)
379
@click.option("-e", "--errors", is_flag=True, default=False, help="Outputs all errors of damonized service components.")
6✔
380
@click.pass_context
6✔
381
def all_logs(ctx, follow, output_all, errors):
6✔
382
    """Check logs of all running daemonized service components."""
383
    processes = list_renku_processes(["up"])
×
384

385
    if not processes:
×
386
        click.echo("Daemonized component processes are not running.\nStart them with `renku service up --daemon`")
×
387
        ctx.exit()
×
388

389
    for proc in processes:
×
390
        if "cmdline" in proc and "up" in proc["cmdline"]:
×
391
            runtime_dir = Path(proc["cmdline"].split("--runtime-dir")[-1].strip())
×
392

393
            stream = runtime_dir / RENKU_DAEMON_LOG_FILE
×
394
            if errors:
×
395
                stream = runtime_dir / RENKU_DAEMON_ERR_FILE
×
396

397
            for line in read_logs(stream.open(mode="r"), follow=follow, output_all=output_all):
×
398
                click.echo(line)
×
399

400

401
@service.command(name="apispec")
6✔
402
def apispec():
6✔
403
    """Return the api spec."""
404
    from renku.ui.service.entrypoint import app
×
405
    from renku.ui.service.views.apispec import get_apispec
×
406

407
    with app.test_request_context():
×
408
        click.echo(get_apispec(app).to_yaml())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc