• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 6875247711

15 Nov 2023 09:16AM UTC coverage: 82.786% (-0.05%) from 82.831%
6875247711

Pull #3300

github

web-flow
Merge e2d3269e8 into 4726f660e
Pull Request #3300: chore: do not always retry load tests requests

25441 of 30731 relevant lines covered (82.79%)

3.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.57
/renku/ui/cli/service.py
1
# Copyright Swiss Data Science Center (SDSC). A partnership between
2
# École Polytechnique Fédérale de Lausanne (EPFL) and
3
# Eidgenössische Technische Hochschule Zürich (ETHZ).
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
"""Commands to launch service components."""
7✔
17
import os
7✔
18
import signal
7✔
19
import subprocess
7✔
20
import sys
7✔
21
import tempfile
7✔
22
import time
7✔
23
from datetime import datetime
7✔
24
from pathlib import Path
7✔
25

26
import click
7✔
27
import psutil
7✔
28

29
import renku.ui.cli.utils.color as color
7✔
30
from renku.command.util import ERROR
7✔
31

32
RENKU_DAEMON_LOG_FILE = "renku.log"
7✔
33
RENKU_DAEMON_ERR_FILE = "renku.err"
7✔
34

35
SERVICE_COMPONENT_TAGS = ["api", "worker"]
7✔
36

37

38
def run_api(addr="0.0.0.0", port=8080, timeout=600):
7✔
39
    """Run service JSON-RPC API."""
40
    from gunicorn.app.wsgiapp import run
×
41

42
    svc_num_workers = os.getenv("RENKU_SVC_NUM_WORKERS", "1")
×
43
    svc_num_threads = os.getenv("RENKU_SVC_NUM_THREADS", "2")
×
44

45
    svc_timeout = int(os.getenv("REQUEST_TIMEOUT", timeout))
×
46

47
    loading_opt = "--preload"
×
48

49
    sys.argv = [
×
50
        "gunicorn",
51
        "renku.ui.service.entrypoint:app",
52
        loading_opt,
53
        "-c",
54
        "gunicorn.conf.py",
55
        "-b",
56
        f"{addr}:{port}",
57
        "--timeout",
58
        f"{svc_timeout}",
59
        "--workers",
60
        svc_num_workers,
61
        "--worker-class",
62
        "gthread",
63
        "--threads",
64
        svc_num_threads,
65
        "--log-level",
66
        "debug",
67
    ]
68

69
    sys.exit(run())
×
70

71

72
def run_worker(queues):
7✔
73
    """Run service workers."""
74
    from renku.ui.service.jobs.queues import QUEUES
×
75
    from renku.ui.service.worker import start_worker
×
76

77
    if not queues:
×
78
        queues = os.getenv("RENKU_SVC_WORKER_QUEUES", "")
×
79
        queues = [queue_name.strip() for queue_name in queues.strip().split(",") if queue_name.strip()]
×
80

81
        if not queues:
×
82
            queues = QUEUES
×
83

84
    start_worker(queues)
×
85

86

87
def check_cmdline(cmdline, include=None):
7✔
88
    """Check `cmdline` command of a process."""
89
    include = include or []
×
90
    service_components = include + SERVICE_COMPONENT_TAGS
×
91

92
    for cmd in service_components:
×
93
        if cmd in cmdline:
×
94
            return True
×
95

96
    return False
×
97

98

99
def is_renku_process(process, include):
7✔
100
    """Return true if this is a renku process."""
101
    process_name = process.name().lower()
×
102

103
    if process_name == "renku" and check_cmdline(process.cmdline(), include):
×
104
        return True
×
105
    elif "python" not in process_name:
×
106
        return False
×
107

108
    try:
×
109
        command_line = process.cmdline()
×
110
        if not check_cmdline(command_line, include):
×
111
            return False
×
112

113
        for line in command_line:
×
114
            if line.endswith("renku"):
×
115
                return True
×
116

117
    except (psutil.AccessDenied, psutil.NoSuchProcess):
×
118
        pass
×
119

120
    return False
×
121

122

123
def list_renku_processes(include=None):
7✔
124
    """List renku processes."""
125
    include = include or []
×
126

127
    renku_processes_all = []
×
128
    for pid in sorted(psutil.pids()):
×
129
        try:
×
130
            proc = psutil.Process(pid)
×
131

132
            if is_renku_process(proc, include) and proc.status() != "zombie":
×
133
                renku_processes_all.append(proc)
×
134

135
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
×
136
            continue
×
137

138
    renku_proc_info = sorted(
×
139
        [
140
            {
141
                "create_time": datetime.fromtimestamp(proc.create_time()).strftime("%d.%b %H:%M"),
142
                "pid": proc.pid,
143
                "cmdline": f"renku {' '.join(proc.cmdline()[2:])}",
144
                "status": proc.status(),
145
                "mem_perct": proc.memory_percent(),
146
                "cpu_perct": proc.cpu_percent(),
147
                "num_threads": proc.num_threads(),
148
            }
149
            for proc in renku_processes_all
150
        ],
151
        key=lambda k: k["cmdline"],
152
    )
153

154
    return renku_proc_info
×
155

156

157
def read_logs(log_file, follow=True, output_all=False):
7✔
158
    """Read logs file. Supports following logs in realtime."""
159
    if follow and not output_all:
×
160
        log_file.seek(0, os.SEEK_END)
×
161

162
    while True:
×
163
        line = log_file.readline()
×
164
        if not line and follow:
×
165
            time.sleep(0.1)
×
166
            continue
×
167

168
        if not line and not follow:
×
169
            return
×
170

171
        yield line
×
172

173

174
@click.group()
7✔
175
@click.option("-e", "--env", default=None, type=click.Path(exists=True, dir_okay=False), help="Path to the .env file.")
7✔
176
@click.pass_context
7✔
177
def service(ctx, env):
7✔
178
    """Manage service components."""
179
    import redis
×
180
    import rq  # noqa: F401
×
181
    from dotenv import load_dotenv
×
182

183
    if ctx.invoked_subcommand in ["apispec", "logs", "api"]:
×
184
        return  # Redis not needed
×
185

186
    try:
×
187
        from renku.ui.service.cache.base import BaseCache
×
188

189
        BaseCache.cache.ping()
×
190

191
        load_dotenv(dotenv_path=env)
×
192
    except ImportError:
×
193
        # NOTE: Service dependency is missing.
194

195
        click.echo(
×
196
            ERROR + "Dependency not found! "
197
            "Please install `pip install renku[service]` to enable service component control."
198
        )
199

200
        ctx.exit(1)
×
201

202
    except redis.exceptions.ConnectionError as e:
×
203
        # NOTE: Cannot connect to the service dependencies, ie. Redis.
204
        click.echo(ERROR + f"Cannot connect to Redis: {e}")
×
205

206
        ctx.exit(1)
×
207

208

209
@service.command(name="api")
7✔
210
@click.option(
7✔
211
    "-a",
212
    "--addr",
213
    type=str,
214
    default="0.0.0.0",
215
    show_default=True,
216
    help="Address on which API service should listen to. By default uses IPv4.",
217
)
218
@click.option(
7✔
219
    "-p",
220
    "--port",
221
    type=int,
222
    default=8080,
223
    show_default=True,
224
    help="Port on which API service should listen to. Avoid ports below 1024, for those use reverse-proxies.",
225
)
226
@click.option(
7✔
227
    "-t",
228
    "--timeout",
229
    type=int,
230
    default=600,
231
    show_default=True,
232
    help="Request silent for more than this many seconds are dropped.",
233
)
234
def api_start(addr, port, timeout):
7✔
235
    """Start service JSON-RPC API in active shell session."""
236
    run_api(addr, port, timeout)
×
237

238

239
@service.command(name="worker")
7✔
240
@click.option("-q", "--queue", multiple=True)
7✔
241
def worker_start(queue):
7✔
242
    """Start service worker in active shell session. By default it listens on all queues."""
243
    run_worker([q.strip() for q in queue if q])
×
244

245

246
@service.command(name="ps")
7✔
247
@click.pass_context
7✔
248
def ps(ctx):
7✔
249
    """Check status of running services."""
250
    from renku.core.util.tabulate import tabulate
×
251

252
    processes = list_renku_processes()
×
253
    headers = [{k.upper(): v for k, v in rec.items()} for rec in processes]
×
254

255
    output = tabulate(processes, headers=headers)
×
256

257
    if not processes:
×
258
        click.echo("Renku service components are down.")
×
259
        ctx.exit()
×
260

261
    click.echo(output)
×
262

263

264
@service.command(name="up")
7✔
265
@click.option("-d", "--daemon", is_flag=True, default=False, help="Starts all processes in daemon mode.")
7✔
266
@click.option("-rd", "--runtime-dir", default=".", help="Directory for runtime metadata in daemon mode.")
7✔
267
@click.pass_context
7✔
268
def all_start(ctx, daemon, runtime_dir):
7✔
269
    """Start all service components."""
270
    from circus import get_arbiter
×
271

272
    from renku.core.util.contexts import chdir
×
273

274
    services = [
×
275
        {
276
            "name": "RenkuCoreService",
277
            "cmd": "renku",
278
            "args": ["service", "api"],
279
            "numprocesses": 1,
280
            "env": os.environ.copy(),
281
            "shell": True,
282
        },
283
        {
284
            "name": "RenkuCoreWorker",
285
            "cmd": "renku",
286
            "args": ["service", "worker"],
287
            "numprocesses": 1,
288
            "env": os.environ.copy(),
289
            "shell": True,
290
        },
291
    ]
292

293
    def launch_arbiter(arbiter):
×
294
        """Helper for launching arbiter process."""
295
        with chdir(runtime_dir):
×
296
            try:
×
297
                arbiter.start()
×
298
            finally:
299
                arbiter.stop()
×
300

301
    if not daemon:
×
302
        launch_arbiter(get_arbiter(services))
×
303
        ctx.exit()
×
304

305
    # NOTE: If we are running in daemon mode, the runtime directory is generated is OS /tmp directory.
306
    # Since in this case daemon is long running process we don't want to pollute user space.
307
    if not runtime_dir or runtime_dir == ".":
×
308
        runtime_dir = tempfile.mkdtemp()
×
309

310
    os.environ["CACHE_DIR"] = runtime_dir
×
311
    click.echo(f"Using runtime directory: {runtime_dir}")
×
312

313
    log_stdout = Path(runtime_dir) / RENKU_DAEMON_LOG_FILE
×
314
    log_stderr = Path(runtime_dir) / RENKU_DAEMON_ERR_FILE
×
315

316
    subprocess.Popen(
×
317
        ["renku", "service", "up", "--runtime-dir", runtime_dir],
318
        stdout=log_stdout.open(mode="w"),
319
        stderr=log_stderr.open(mode="w"),
320
        start_new_session=True,
321
    )
322

323
    click.secho("OK", fg=color.GREEN)
×
324

325

326
@service.command(name="down")
7✔
327
def all_stop():
7✔
328
    """Stop all service components."""
329
    # NOTE: We include `renku service up` because that process contains the arbiter and watcher.
330
    processes = list_renku_processes(["up"])
×
331

332
    for proc in processes:
×
333
        click.echo(f"Shutting down [{proc['pid']}] `{proc['cmdline']}`")
×
334
        try:
×
335
            os.kill(proc["pid"], signal.SIGKILL)
×
336
        except ProcessLookupError:
×
337
            click.echo(f"Process [{proc['pid']}] `{proc['cmdline']}` not found - skipping")
×
338
            continue
×
339

340
    if processes:
×
341
        click.secho("OK", fg=color.GREEN)
×
342
    else:
343
        click.echo("Nothing to shut down.")
×
344

345

346
@service.command(name="restart")
7✔
347
def all_restart():
7✔
348
    """Restart all running service components."""
349
    processes = list_renku_processes()
×
350

351
    for proc in processes:
×
352
        click.echo(f"Restarting `{proc['cmdline']}`")
×
353
        os.kill(proc["pid"], signal.SIGKILL)
×
354

355
    if processes:
×
356
        click.secho("OK", fg=color.GREEN)
×
357
    else:
358
        click.echo("Nothing to restart.")
×
359

360

361
@service.command(name="logs")
7✔
362
@click.option("-f", "--follow", is_flag=True, default=False, help="Follows logs of damonized service components.")
7✔
363
@click.option(
7✔
364
    "-a", "--output-all", is_flag=True, default=False, help="Outputs ALL logs of damonized service components."
365
)
366
@click.option("-e", "--errors", is_flag=True, default=False, help="Outputs all errors of damonized service components.")
7✔
367
@click.pass_context
7✔
368
def all_logs(ctx, follow, output_all, errors):
7✔
369
    """Check logs of all running daemonized service components."""
370
    processes = list_renku_processes(["up"])
×
371

372
    if not processes:
×
373
        click.echo("Daemonized component processes are not running.\nStart them with `renku service up --daemon`")
×
374
        ctx.exit()
×
375

376
    for proc in processes:
×
377
        if "cmdline" in proc and "up" in proc["cmdline"]:
×
378
            runtime_dir = Path(proc["cmdline"].split("--runtime-dir")[-1].strip())
×
379

380
            stream = runtime_dir / RENKU_DAEMON_LOG_FILE
×
381
            if errors:
×
382
                stream = runtime_dir / RENKU_DAEMON_ERR_FILE
×
383

384
            for line in read_logs(stream.open(mode="r"), follow=follow, output_all=output_all):
×
385
                click.echo(line)
×
386

387

388
@service.command(name="apispec")
7✔
389
def apispec():
7✔
390
    """Return the api spec."""
391
    from renku.ui.service.entrypoint import app
×
392
    from renku.ui.service.views.apispec import get_apispec
×
393

394
    with app.test_request_context():
×
395
        click.echo(get_apispec(app).to_yaml())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc