• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 5948296099

23 Aug 2023 07:23AM UTC coverage: 85.801% (+0.04%) from 85.766%
5948296099

Pull #3601

github-actions

olevski
chore: run poetry lock
Pull Request #3601: hotfix: v2.6.1

40 of 48 new or added lines in 10 files covered. (83.33%)

285 existing lines in 25 files now uncovered.

25875 of 30157 relevant lines covered (85.8%)

4.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.69
/renku/ui/cli/service.py
1
#
2
# Copyright 2021 - Swiss Data Science Center (SDSC)
3
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
4
# Eidgenössische Technische Hochschule Zürich (ETHZ).
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
"""Commands to launch service components."""
11✔
18
import os
11✔
19
import signal
11✔
20
import subprocess
11✔
21
import sys
11✔
22
import tempfile
11✔
23
import time
11✔
24
from datetime import datetime
11✔
25
from pathlib import Path
11✔
26

27
import click
11✔
28
import psutil
11✔
29

30
import renku.ui.cli.utils.color as color
11✔
31
from renku.command.util import ERROR
11✔
32

33
RENKU_DAEMON_LOG_FILE = "renku.log"
11✔
34
RENKU_DAEMON_ERR_FILE = "renku.err"
11✔
35

36
SERVICE_COMPONENT_TAGS = ["api", "scheduler", "worker"]
11✔
37

38

39
def run_api(addr="0.0.0.0", port=8080, timeout=600):
11✔
40
    """Run service JSON-RPC API."""
41
    from gunicorn.app.wsgiapp import run
×
42

43
    svc_num_workers = os.getenv("RENKU_SVC_NUM_WORKERS", "1")
×
44
    svc_num_threads = os.getenv("RENKU_SVC_NUM_THREADS", "2")
×
45

46
    svc_timeout = int(os.getenv("REQUEST_TIMEOUT", timeout))
×
47

48
    loading_opt = "--preload"
×
49

50
    sys.argv = [
×
51
        "gunicorn",
52
        "renku.ui.service.entrypoint:app",
53
        loading_opt,
54
        "-b",
55
        f"{addr}:{port}",
56
        "--timeout",
57
        f"{svc_timeout}",
58
        "--workers",
59
        svc_num_workers,
60
        "--worker-class",
61
        "gthread",
62
        "--threads",
63
        svc_num_threads,
64
        "--log-level",
65
        "debug",
66
    ]
67

68
    sys.exit(run())
×
69

70

71
def run_worker(queues):
11✔
72
    """Run service workers."""
UNCOV
73
    from renku.ui.service.jobs.queues import QUEUES
×
UNCOV
74
    from renku.ui.service.worker import start_worker
×
75

UNCOV
76
    if not queues:
×
UNCOV
77
        queues = os.getenv("RENKU_SVC_WORKER_QUEUES", "")
×
UNCOV
78
        queues = [queue_name.strip() for queue_name in queues.strip().split(",") if queue_name.strip()]
×
79

UNCOV
80
        if not queues:
×
UNCOV
81
            queues = QUEUES
×
82

UNCOV
83
    start_worker(queues)
×
84

85

86
def check_cmdline(cmdline, include=None):
11✔
87
    """Check `cmdline` command of a process."""
88
    include = include or []
2✔
89
    service_components = include + SERVICE_COMPONENT_TAGS
2✔
90

91
    for cmd in service_components:
2✔
92
        if cmd in cmdline:
2✔
93
            return True
2✔
94

95
    return False
2✔
96

97

98
def is_renku_process(process, include):
11✔
99
    """Return true if this is a renku process."""
100
    process_name = process.name().lower()
2✔
101

102
    if process_name == "renku" and check_cmdline(process.cmdline(), include):
2✔
103
        return True
2✔
104
    elif "python" not in process_name:
2✔
105
        return False
2✔
106

107
    try:
2✔
108
        command_line = process.cmdline()
2✔
109
        if not check_cmdline(command_line, include):
2✔
110
            return False
2✔
111

112
        for line in command_line:
×
113
            if line.endswith("renku"):
×
114
                return True
×
115

116
    except (psutil.AccessDenied, psutil.NoSuchProcess):
×
117
        pass
×
118

119
    return False
×
120

121

122
def list_renku_processes(include=None):
11✔
123
    """List renku processes."""
124
    include = include or []
2✔
125

126
    renku_processes_all = []
2✔
127
    for pid in sorted(psutil.pids()):
2✔
128
        try:
2✔
129
            proc = psutil.Process(pid)
2✔
130

131
            if is_renku_process(proc, include) and proc.status() != "zombie":
2✔
132
                renku_processes_all.append(proc)
2✔
133

134
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
2✔
135
            continue
2✔
136

137
    renku_proc_info = sorted(
2✔
138
        [
139
            {
140
                "create_time": datetime.fromtimestamp(proc.create_time()).strftime("%d.%b %H:%M"),
141
                "pid": proc.pid,
142
                "cmdline": f"renku {' '.join(proc.cmdline()[2:])}",
143
                "status": proc.status(),
144
                "mem_perct": proc.memory_percent(),
145
                "cpu_perct": proc.cpu_percent(),
146
                "num_threads": proc.num_threads(),
147
            }
148
            for proc in renku_processes_all
149
        ],
150
        key=lambda k: k["cmdline"],
151
    )
152

153
    return renku_proc_info
2✔
154

155

156
def read_logs(log_file, follow=True, output_all=False):
11✔
157
    """Read logs file. Supports following logs in realtime."""
158
    if follow and not output_all:
×
159
        log_file.seek(0, os.SEEK_END)
×
160

UNCOV
161
    while True:
×
162
        line = log_file.readline()
×
163
        if not line and follow:
×
164
            time.sleep(0.1)
×
165
            continue
×
166

167
        if not line and not follow:
×
168
            return
×
169

170
        yield line
×
171

172

173
@click.group()
11✔
174
@click.option("-e", "--env", default=None, type=click.Path(exists=True, dir_okay=False), help="Path to the .env file.")
11✔
175
@click.pass_context
11✔
176
def service(ctx, env):
11✔
177
    """Manage service components."""
178
    import redis
2✔
179
    import rq  # noqa: F401
2✔
180
    from dotenv import load_dotenv
2✔
181

182
    if ctx.invoked_subcommand in ["apispec", "logs", "api"]:
2✔
UNCOV
183
        return  # Redis not needed
×
184

185
    try:
2✔
186
        from renku.ui.service.cache.base import BaseCache
2✔
187

188
        BaseCache.cache.ping()
2✔
189

190
        load_dotenv(dotenv_path=env)
2✔
UNCOV
191
    except ImportError:
×
192
        # NOTE: Service dependency is missing.
193

194
        click.echo(
×
195
            ERROR + "Dependency not found! "
196
            "Please install `pip install renku[service]` to enable service component control."
197
        )
198

199
        ctx.exit(1)
×
200

UNCOV
201
    except redis.exceptions.ConnectionError as e:
×
202
        # NOTE: Cannot connect to the service dependencies, ie. Redis.
UNCOV
203
        click.echo(ERROR + f"Cannot connect to Redis: {e}")
×
204

UNCOV
205
        ctx.exit(1)
×
206

207

208
@service.command(name="api")
11✔
209
@click.option(
11✔
210
    "-a",
211
    "--addr",
212
    type=str,
213
    default="0.0.0.0",
214
    show_default=True,
215
    help="Address on which API service should listen to. By default uses IPv4.",
216
)
217
@click.option(
11✔
218
    "-p",
219
    "--port",
220
    type=int,
221
    default=8080,
222
    show_default=True,
223
    help="Port on which API service should listen to. Avoid ports below 1024, for those use reverse-proxies.",
224
)
225
@click.option(
11✔
226
    "-t",
227
    "--timeout",
228
    type=int,
229
    default=600,
230
    show_default=True,
231
    help="Request silent for more than this many seconds are dropped.",
232
)
233
def api_start(addr, port, timeout):
11✔
234
    """Start service JSON-RPC API in active shell session."""
235
    run_api(addr, port, timeout)
×
236

237

238
@service.command(name="scheduler")
11✔
239
def scheduler_start():
11✔
240
    """Start service scheduler in active shell session."""
UNCOV
241
    from renku.ui.service.scheduler import start_scheduler
×
242

UNCOV
243
    start_scheduler()
×
244

245

246
@service.command(name="worker")
11✔
247
@click.option("-q", "--queue", multiple=True)
11✔
248
def worker_start(queue):
11✔
249
    """Start service worker in active shell session. By default it listens on all queues."""
UNCOV
250
    run_worker([q.strip() for q in queue if q])
×
251

252

253
@service.command(name="ps")
11✔
254
@click.pass_context
11✔
255
def ps(ctx):
11✔
256
    """Check status of running services."""
257
    from renku.core.util.tabulate import tabulate
2✔
258

259
    processes = list_renku_processes()
2✔
260
    headers = [{k.upper(): v for k, v in rec.items()} for rec in processes]
2✔
261

262
    output = tabulate(processes, headers=headers)
2✔
263

264
    if not processes:
2✔
UNCOV
265
        click.echo("Renku service components are down.")
×
UNCOV
266
        ctx.exit()
×
267

268
    click.echo(output)
2✔
269

270

271
@service.command(name="up")
11✔
272
@click.option("-d", "--daemon", is_flag=True, default=False, help="Starts all processes in daemon mode.")
11✔
273
@click.option("-rd", "--runtime-dir", default=".", help="Directory for runtime metadata in daemon mode.")
11✔
274
@click.pass_context
11✔
275
def all_start(ctx, daemon, runtime_dir):
11✔
276
    """Start all service components."""
277
    from circus import get_arbiter
2✔
278

279
    from renku.core.util.contexts import chdir
2✔
280

281
    services = [
2✔
282
        {
283
            "name": "RenkuCoreService",
284
            "cmd": "renku",
285
            "args": ["service", "api"],
286
            "numprocesses": 1,
287
            "env": os.environ.copy(),
288
            "shell": True,
289
        },
290
        {
291
            "name": "RenkuCoreScheduler",
292
            "cmd": "renku",
293
            "args": ["service", "scheduler"],
294
            "numprocesses": 1,
295
            "env": os.environ.copy(),
296
            "shell": True,
297
        },
298
        {
299
            "name": "RenkuCoreWorker",
300
            "cmd": "renku",
301
            "args": ["service", "worker"],
302
            "numprocesses": 1,
303
            "env": os.environ.copy(),
304
            "shell": True,
305
        },
306
    ]
307

308
    def launch_arbiter(arbiter):
2✔
309
        """Helper for launching arbiter process."""
UNCOV
310
        with chdir(runtime_dir):
×
UNCOV
311
            try:
×
UNCOV
312
                arbiter.start()
×
313
            finally:
UNCOV
314
                arbiter.stop()
×
315

316
    if not daemon:
2✔
UNCOV
317
        launch_arbiter(get_arbiter(services))
×
UNCOV
318
        ctx.exit()
×
319

320
    # NOTE: If we are running in daemon mode, the runtime directory is generated is OS /tmp directory.
321
    # Since in this case daemon is long running process we don't want to pollute user space.
322
    if not runtime_dir or runtime_dir == ".":
2✔
UNCOV
323
        runtime_dir = tempfile.mkdtemp()
×
324

325
    os.environ["CACHE_DIR"] = runtime_dir
2✔
326
    click.echo(f"Using runtime directory: {runtime_dir}")
2✔
327

328
    log_stdout = Path(runtime_dir) / RENKU_DAEMON_LOG_FILE
2✔
329
    log_stderr = Path(runtime_dir) / RENKU_DAEMON_ERR_FILE
2✔
330

331
    subprocess.Popen(
2✔
332
        ["renku", "service", "up", "--runtime-dir", runtime_dir],
333
        stdout=log_stdout.open(mode="w"),
334
        stderr=log_stderr.open(mode="w"),
335
        start_new_session=True,
336
    )
337

338
    click.secho("OK", fg=color.GREEN)
2✔
339

340

341
@service.command(name="down")
11✔
342
def all_stop():
11✔
343
    """Stop all service components."""
344
    # NOTE: We include `renku service up` because that process contains the arbiter and watcher.
345
    processes = list_renku_processes(["up"])
2✔
346

347
    for proc in processes:
2✔
348
        click.echo(f"Shutting down [{proc['pid']}] `{proc['cmdline']}`")
2✔
349
        try:
2✔
350
            os.kill(proc["pid"], signal.SIGKILL)
2✔
UNCOV
351
        except ProcessLookupError:
×
UNCOV
352
            click.echo(f"Process [{proc['pid']}] `{proc['cmdline']}` not found - skipping")
×
UNCOV
353
            continue
×
354

355
    if processes:
2✔
356
        click.secho("OK", fg=color.GREEN)
2✔
357
    else:
UNCOV
358
        click.echo("Nothing to shut down.")
×
359

360

361
@service.command(name="restart")
11✔
362
def all_restart():
11✔
363
    """Restart all running service components."""
364
    processes = list_renku_processes()
2✔
365

366
    for proc in processes:
2✔
367
        click.echo(f"Restarting `{proc['cmdline']}`")
2✔
368
        os.kill(proc["pid"], signal.SIGKILL)
2✔
369

370
    if processes:
2✔
371
        click.secho("OK", fg=color.GREEN)
2✔
372
    else:
UNCOV
373
        click.echo("Nothing to restart.")
×
374

375

376
@service.command(name="logs")
11✔
377
@click.option("-f", "--follow", is_flag=True, default=False, help="Follows logs of damonized service components.")
11✔
378
@click.option(
11✔
379
    "-a", "--output-all", is_flag=True, default=False, help="Outputs ALL logs of damonized service components."
380
)
381
@click.option("-e", "--errors", is_flag=True, default=False, help="Outputs all errors of damonized service components.")
11✔
382
@click.pass_context
11✔
383
def all_logs(ctx, follow, output_all, errors):
11✔
384
    """Check logs of all running daemonized service components."""
UNCOV
385
    processes = list_renku_processes(["up"])
×
386

UNCOV
387
    if not processes:
×
UNCOV
388
        click.echo("Daemonized component processes are not running.\nStart them with `renku service up --daemon`")
×
UNCOV
389
        ctx.exit()
×
390

391
    for proc in processes:
×
UNCOV
392
        if "cmdline" in proc and "up" in proc["cmdline"]:
×
393
            runtime_dir = Path(proc["cmdline"].split("--runtime-dir")[-1].strip())
×
394

UNCOV
395
            stream = runtime_dir / RENKU_DAEMON_LOG_FILE
×
UNCOV
396
            if errors:
×
UNCOV
397
                stream = runtime_dir / RENKU_DAEMON_ERR_FILE
×
398

UNCOV
399
            for line in read_logs(stream.open(mode="r"), follow=follow, output_all=output_all):
×
UNCOV
400
                click.echo(line)
×
401

402

403
@service.command(name="apispec")
11✔
404
def apispec():
11✔
405
    """Return the api spec."""
UNCOV
406
    from renku.ui.service.entrypoint import app
×
UNCOV
407
    from renku.ui.service.views.apispec import get_apispec
×
408

UNCOV
409
    with app.test_request_context():
×
UNCOV
410
        click.echo(get_apispec(app).to_yaml())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc