• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

popstas / talks-reducer / 18416480696

10 Oct 2025 07:29PM UTC coverage: 44.81% (+0.6%) from 44.186%
18416480696

push

github

web-flow
Refactor CLI application and update tests (#85)

102 of 141 new or added lines in 1 file covered. (72.34%)

87 existing lines in 1 file now uncovered.

2832 of 6320 relevant lines covered (44.81%)

0.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.64
/talks_reducer/cli.py
1
"""Command line interface for the talks reducer package."""
2

3
from __future__ import annotations
1✔
4

5
import argparse
1✔
6
import os
1✔
7
import shutil
1✔
8
import subprocess
1✔
9
import sys
1✔
10
import time
1✔
11
from importlib import import_module
1✔
12
from pathlib import Path
1✔
13
from typing import Callable, Dict, List, Optional, Sequence, Tuple
1✔
14

15
from . import audio
1✔
16
from .ffmpeg import FFmpegNotFoundError
1✔
17
from .models import ProcessingOptions, default_temp_folder
1✔
18
from .pipeline import speed_up_video
1✔
19
from .progress import TqdmProgressReporter
1✔
20
from .version_utils import resolve_version
1✔
21

22

23
def _build_parser() -> argparse.ArgumentParser:
1✔
24
    """Create the argument parser used by the command line interface."""
25

26
    parser = argparse.ArgumentParser(
×
27
        description="Modifies a video file to play at different speeds when there is sound vs. silence.",
28
    )
29

30
    # Add version argument
31
    pkg_version = resolve_version()
×
32

33
    parser.add_argument(
×
34
        "--version",
35
        action="version",
36
        version=f"talks-reducer {pkg_version}",
37
    )
38

39
    parser.add_argument(
×
40
        "input_file",
41
        type=str,
42
        nargs="+",
43
        help="The video file(s) you want modified. Can be one or more directories and / or single files.",
44
    )
45
    parser.add_argument(
×
46
        "-o",
47
        "--output_file",
48
        type=str,
49
        dest="output_file",
50
        help="The output file. Only usable if a single file is given. If not included, it'll append _ALTERED to the name.",
51
    )
52
    parser.add_argument(
×
53
        "--temp_folder",
54
        type=str,
55
        default=str(default_temp_folder()),
56
        help="The file path of the temporary working folder.",
57
    )
58
    parser.add_argument(
×
59
        "-t",
60
        "--silent_threshold",
61
        type=float,
62
        dest="silent_threshold",
63
        help="The volume amount that frames' audio needs to surpass to be considered sounded. Defaults to 0.05.",
64
    )
65
    parser.add_argument(
×
66
        "-S",
67
        "--sounded_speed",
68
        type=float,
69
        dest="sounded_speed",
70
        help="The speed that sounded (spoken) frames should be played at. Defaults to 1.",
71
    )
72
    parser.add_argument(
×
73
        "-s",
74
        "--silent_speed",
75
        type=float,
76
        dest="silent_speed",
77
        help="The speed that silent frames should be played at. Defaults to 4.",
78
    )
79
    parser.add_argument(
×
80
        "-fm",
81
        "--frame_margin",
82
        type=float,
83
        dest="frame_spreadage",
84
        help="Some silent frames adjacent to sounded frames are included to provide context. Defaults to 2.",
85
    )
86
    parser.add_argument(
×
87
        "-sr",
88
        "--sample_rate",
89
        type=float,
90
        dest="sample_rate",
91
        help="Sample rate of the input and output videos. Usually extracted automatically by FFmpeg.",
92
    )
93
    parser.add_argument(
×
94
        "--small",
95
        action="store_true",
96
        help="Apply small file optimizations: resize video to 720p, audio to 128k bitrate, best compression (uses CUDA if available).",
97
    )
98
    parser.add_argument(
×
99
        "--url",
100
        dest="server_url",
101
        default=None,
102
        help="Process videos via a Talks Reducer server at the provided base URL (for example, http://localhost:9005).",
103
    )
104
    parser.add_argument(
×
105
        "--host",
106
        dest="host",
107
        default=None,
108
        help="Shortcut for --url when targeting a Talks Reducer server on port 9005 (for example, localhost).",
109
    )
110
    parser.add_argument(
×
111
        "--server-stream",
112
        action="store_true",
113
        help="Stream remote progress updates when using --url.",
114
    )
115
    return parser
×
116

117

118
def gather_input_files(paths: List[str]) -> List[str]:
1✔
119
    """Expand provided paths into a flat list of files that contain audio streams."""
120

121
    files: List[str] = []
×
122
    for input_path in paths:
×
123
        if os.path.isfile(input_path) and audio.is_valid_input_file(input_path):
×
124
            files.append(os.path.abspath(input_path))
×
125
        elif os.path.isdir(input_path):
×
126
            for file in os.listdir(input_path):
×
127
                candidate = os.path.join(input_path, file)
×
128
                if audio.is_valid_input_file(candidate):
×
129
                    files.append(candidate)
×
130
    return files
×
131

132

133
def _print_total_time(start_time: float) -> None:
1✔
134
    """Print the elapsed processing time since *start_time*."""
135

136
    end_time = time.time()
1✔
137
    total_time = end_time - start_time
1✔
138
    hours, remainder = divmod(total_time, 3600)
1✔
139
    minutes, seconds = divmod(remainder, 60)
1✔
140
    print(f"\nTime: {int(hours)}h {int(minutes)}m {seconds:.2f}s")
1✔
141

142

143
class CliApplication:
1✔
144
    """Coordinator for CLI processing with dependency injection support."""
145

146
    def __init__(
1✔
147
        self,
148
        *,
149
        gather_files: Callable[[List[str]], List[str]],
150
        send_video: Optional[Callable[..., Tuple[Path, str, str]]],
151
        speed_up: Callable[[ProcessingOptions, object], object],
152
        reporter_factory: Callable[[], object],
153
        remote_error_message: Optional[str] = None,
154
    ) -> None:
155
        self._gather_files = gather_files
1✔
156
        self._send_video = send_video
1✔
157
        self._speed_up = speed_up
1✔
158
        self._reporter_factory = reporter_factory
1✔
159
        self._remote_error_message = remote_error_message
1✔
160

161
    def run(self, parsed_args: argparse.Namespace) -> Tuple[int, List[str]]:
1✔
162
        """Execute the CLI pipeline for *parsed_args*."""
163

164
        start_time = time.time()
1✔
165
        files = self._gather_files(parsed_args.input_file)
1✔
166

167
        args: Dict[str, object] = {
1✔
168
            key: value for key, value in vars(parsed_args).items() if value is not None
169
        }
170
        del args["input_file"]
1✔
171

172
        if "host" in args:
1✔
NEW
173
            del args["host"]
×
174

175
        if len(files) > 1 and "output_file" in args:
1✔
NEW
176
            del args["output_file"]
×
177

178
        error_messages: List[str] = []
1✔
179
        reporter_logs: List[str] = []
1✔
180

181
        if getattr(parsed_args, "server_url", None):
1✔
182
            remote_success, remote_errors, fallback_logs = self._process_via_server(
1✔
183
                files, parsed_args, start_time
184
            )
185
            error_messages.extend(remote_errors)
1✔
186
            reporter_logs.extend(fallback_logs)
1✔
187
            if remote_success:
1✔
188
                return 0, error_messages
1✔
189

190
        reporter = self._reporter_factory()
1✔
191
        for message in reporter_logs:
1✔
192
            reporter.log(message)
1✔
193

194
        for index, file in enumerate(files):
1✔
195
            print(
1✔
196
                f"Processing file {index + 1}/{len(files)} '{os.path.basename(file)}'"
197
            )
198
            local_options = dict(args)
1✔
199

200
            option_kwargs: Dict[str, object] = {"input_file": Path(file)}
1✔
201

202
            if "output_file" in local_options:
1✔
203
                option_kwargs["output_file"] = Path(local_options["output_file"])
1✔
204
            if "temp_folder" in local_options:
1✔
205
                option_kwargs["temp_folder"] = Path(local_options["temp_folder"])
1✔
206
            if "silent_threshold" in local_options:
1✔
207
                option_kwargs["silent_threshold"] = float(
1✔
208
                    local_options["silent_threshold"]
209
                )
210
            if "silent_speed" in local_options:
1✔
211
                option_kwargs["silent_speed"] = float(local_options["silent_speed"])
1✔
212
            if "sounded_speed" in local_options:
1✔
213
                option_kwargs["sounded_speed"] = float(local_options["sounded_speed"])
1✔
214
            if "frame_spreadage" in local_options:
1✔
215
                option_kwargs["frame_spreadage"] = int(local_options["frame_spreadage"])
1✔
216
            if "sample_rate" in local_options:
1✔
217
                option_kwargs["sample_rate"] = int(local_options["sample_rate"])
1✔
218
            if "small" in local_options:
1✔
219
                option_kwargs["small"] = bool(local_options["small"])
1✔
220
            options = ProcessingOptions(**option_kwargs)
1✔
221

222
            try:
1✔
223
                result = self._speed_up(options, reporter=reporter)
1✔
NEW
224
            except FFmpegNotFoundError as exc:
×
NEW
225
                message = str(exc)
×
NEW
226
                return 1, [*error_messages, message]
×
227

228
            reporter.log(f"Completed: {result.output_file}")
1✔
229
            summary_parts: List[str] = []
1✔
230
            time_ratio = getattr(result, "time_ratio", None)
1✔
231
            size_ratio = getattr(result, "size_ratio", None)
1✔
232
            if time_ratio is not None:
1✔
233
                summary_parts.append(f"{time_ratio * 100:.0f}% time")
1✔
234
            if size_ratio is not None:
1✔
235
                summary_parts.append(f"{size_ratio * 100:.0f}% size")
1✔
236
            if summary_parts:
1✔
237
                reporter.log("Result: " + ", ".join(summary_parts))
1✔
238

239
        _print_total_time(start_time)
1✔
240
        return 0, error_messages
1✔
241

242
    def _process_via_server(
1✔
243
        self,
244
        files: Sequence[str],
245
        parsed_args: argparse.Namespace,
246
        start_time: float,
247
    ) -> Tuple[bool, List[str], List[str]]:
248
        """Upload *files* to the configured server and download the results."""
249

250
        if not self._send_video:
1✔
NEW
251
            message = self._remote_error_message or "Server processing is unavailable."
×
NEW
252
            fallback_notice = "Falling back to local processing pipeline."
×
NEW
253
            return False, [message, fallback_notice], [message, fallback_notice]
×
254

255
        server_url = parsed_args.server_url
1✔
256
        if not server_url:
1✔
NEW
257
            message = "Server URL was not provided."
×
NEW
258
            fallback_notice = "Falling back to local processing pipeline."
×
NEW
259
            return False, [message, fallback_notice], [message, fallback_notice]
×
260

261
        output_override: Optional[Path] = None
1✔
262
        if parsed_args.output_file and len(files) == 1:
1✔
NEW
263
            output_override = Path(parsed_args.output_file).expanduser()
×
264
        elif parsed_args.output_file and len(files) > 1:
1✔
NEW
265
            print(
×
266
                "Warning: --output is ignored when processing multiple files via the server.",
267
                file=sys.stderr,
268
            )
269

270
        remote_option_values: Dict[str, float] = {}
1✔
271
        if parsed_args.silent_threshold is not None:
1✔
272
            remote_option_values["silent_threshold"] = float(
1✔
273
                parsed_args.silent_threshold
274
            )
275
        if parsed_args.silent_speed is not None:
1✔
276
            remote_option_values["silent_speed"] = float(parsed_args.silent_speed)
1✔
277
        if parsed_args.sounded_speed is not None:
1✔
278
            remote_option_values["sounded_speed"] = float(parsed_args.sounded_speed)
1✔
279

280
        unsupported_options: List[str] = []
1✔
281
        for name in ("frame_spreadage", "sample_rate", "temp_folder"):
1✔
282
            if getattr(parsed_args, name) is not None:
1✔
NEW
283
                unsupported_options.append(f"--{name.replace('_', '-')}")
×
284

285
        if unsupported_options:
1✔
NEW
286
            print(
×
287
                "Warning: the following options are ignored when using --url: "
288
                + ", ".join(sorted(unsupported_options)),
289
                file=sys.stderr,
290
            )
291

292
        for index, file in enumerate(files, start=1):
1✔
293
            basename = os.path.basename(file)
1✔
294
            print(
1✔
295
                f"Processing file {index}/{len(files)} '{basename}' via server {server_url}"
296
            )
297
            printed_log_header = False
1✔
298
            progress_state: dict[str, tuple[Optional[int], Optional[int], str]] = {}
1✔
299
            stream_updates = bool(getattr(parsed_args, "server_stream", False))
1✔
300

301
            def _stream_server_log(line: str) -> None:
1✔
302
                nonlocal printed_log_header
NEW
303
                if not printed_log_header:
×
NEW
304
                    print("\nServer log:", flush=True)
×
NEW
305
                    printed_log_header = True
×
NEW
306
                print(line, flush=True)
×
307

308
            def _stream_progress(
1✔
309
                desc: str, current: Optional[int], total: Optional[int], unit: str
310
            ) -> None:
NEW
311
                key = desc or "Processing"
×
NEW
312
                state = (current, total, unit)
×
NEW
313
                if progress_state.get(key) == state:
×
NEW
314
                    return
×
NEW
315
                progress_state[key] = state
×
316

NEW
317
                parts: List[str] = []
×
NEW
318
                if current is not None and total and total > 0:
×
NEW
319
                    percent = (current / total) * 100
×
NEW
320
                    parts.append(f"{current}/{total}")
×
NEW
321
                    parts.append(f"{percent:.1f}%")
×
NEW
322
                elif current is not None:
×
NEW
323
                    parts.append(str(current))
×
NEW
324
                if unit:
×
NEW
325
                    parts.append(unit)
×
NEW
326
                message = " ".join(parts).strip()
×
NEW
327
                print(f"{key}: {message or 'update'}", flush=True)
×
328

329
            try:
1✔
330
                destination, summary, log_text = self._send_video(
1✔
331
                    input_path=Path(file),
332
                    output_path=output_override,
333
                    server_url=server_url,
334
                    small=bool(parsed_args.small),
335
                    **remote_option_values,
336
                    log_callback=_stream_server_log,
337
                    stream_updates=stream_updates,
338
                    progress_callback=_stream_progress if stream_updates else None,
339
                )
340
            except Exception as exc:  # pragma: no cover - network failure safeguard
341
                message = f"Failed to process {basename} via server: {exc}"
342
                fallback_notice = "Falling back to local processing pipeline."
343
                return False, [message, fallback_notice], [message, fallback_notice]
344

345
            print(summary)
1✔
346
            print(f"Saved processed video to {destination}")
1✔
347
            if log_text.strip() and not printed_log_header:
1✔
348
                print("\nServer log:\n" + log_text)
1✔
349

350
        _print_total_time(start_time)
1✔
351
        return True, [], []
1✔
352

353

354
def _launch_gui(argv: Sequence[str]) -> bool:
1✔
355
    """Attempt to launch the GUI with the provided arguments."""
356

357
    try:
×
358
        gui_module = import_module(".gui", __package__)
×
359
    except ImportError:
×
360
        return False
×
361

362
    gui_main = getattr(gui_module, "main", None)
×
363
    if gui_main is None:
×
364
        return False
×
365

366
    return bool(gui_main(list(argv)))
×
367

368

369
def _launch_server(argv: Sequence[str]) -> bool:
1✔
370
    """Attempt to launch the Gradio server with the provided arguments."""
371

372
    try:
×
373
        server_module = import_module(".server", __package__)
×
374
    except ImportError:
×
375
        return False
×
376

377
    server_main = getattr(server_module, "main", None)
×
378
    if server_main is None:
×
379
        return False
×
380

381
    server_main(list(argv))
×
382
    return True
×
383

384

385
def _find_server_tray_binary() -> Optional[Path]:
1✔
386
    """Return the best available path to the server tray executable."""
387

388
    binary_name = "talks-reducer-server-tray"
×
389
    candidates: List[Path] = []
×
390

391
    which_path = shutil.which(binary_name)
×
392
    if which_path:
×
393
        candidates.append(Path(which_path))
×
394

395
    try:
×
396
        launcher_dir = Path(sys.argv[0]).resolve().parent
×
397
    except Exception:
×
398
        launcher_dir = None
×
399

400
    potential_names = [binary_name]
×
401
    if sys.platform == "win32":
×
402
        potential_names = [f"{binary_name}.exe", binary_name]
×
403

404
    if launcher_dir is not None:
×
405
        for name in potential_names:
×
406
            candidates.append(launcher_dir / name)
×
407

408
    for candidate in candidates:
×
409
        if candidate and candidate.exists() and os.access(candidate, os.X_OK):
×
410
            return candidate
×
411

412
    return None
×
413

414

415
def _should_hide_subprocess_console() -> bool:
1✔
416
    """Return ``True` ` when a detached Windows launch should hide the console."""
417

418
    if sys.platform != "win32":
×
419
        return False
×
420

421
    try:
×
422
        import ctypes
×
423
    except Exception:  # pragma: no cover - optional runtime dependency
424
        return False
425

426
    try:
×
427
        get_console_window = ctypes.windll.kernel32.GetConsoleWindow  # type: ignore[attr-defined]
×
428
    except Exception:  # pragma: no cover - platform specific guard
429
        return False
430

431
    try:
×
432
        handle = get_console_window()
×
433
    except Exception:  # pragma: no cover - defensive fallback
434
        return False
435

436
    return handle == 0
×
437

438

439
def _launch_server_tray_binary(argv: Sequence[str]) -> bool:
1✔
440
    """Launch the packaged server tray executable when available."""
441

442
    command = _find_server_tray_binary()
1✔
443
    if command is None:
1✔
444
        return False
1✔
445

446
    tray_args = [str(command), *list(argv)]
1✔
447

448
    run_kwargs: Dict[str, object] = {"check": False}
1✔
449

450
    if sys.platform == "win32":
1✔
451
        no_window_flag = getattr(subprocess, "CREATE_NO_WINDOW", 0)
1✔
452
        if no_window_flag and _should_hide_subprocess_console():
1✔
453
            run_kwargs["creationflags"] = no_window_flag
1✔
454

455
    try:
1✔
456
        result = subprocess.run(tray_args, **run_kwargs)
1✔
457
    except OSError:
×
458
        return False
×
459

460
    return result.returncode == 0
1✔
461

462

463
def _launch_server_tray(argv: Sequence[str]) -> bool:
1✔
464
    """Attempt to launch the server tray helper with the provided arguments."""
465

466
    if _launch_server_tray_binary(argv):
1✔
467
        return True
×
468

469
    try:
1✔
470
        tray_module = import_module(".server_tray", __package__)
1✔
471
    except ImportError:
×
472
        return False
×
473

474
    tray_main = getattr(tray_module, "main", None)
1✔
475
    if tray_main is None:
1✔
476
        return False
×
477

478
    tray_main(list(argv))
1✔
479
    return True
1✔
480

481

482
def main(argv: Optional[Sequence[str]] = None) -> None:
1✔
483
    """Entry point for the command line interface.
484

485
    Launch the GUI when run without arguments, otherwise defer to the CLI.
486
    """
487

488
    if argv is None:
1✔
489
        argv_list = sys.argv[1:]
×
490
    else:
491
        argv_list = list(argv)
1✔
492

493
    if "--server" in argv_list:
1✔
494
        index = argv_list.index("--server")
1✔
495
        tray_args = argv_list[index + 1 :]
1✔
496
        if not _launch_server_tray(tray_args):
1✔
497
            print("Server tray mode is unavailable.", file=sys.stderr)
1✔
498
            sys.exit(1)
1✔
499
        return
1✔
500

501
    if argv_list and argv_list[0] in {"server", "serve"}:
1✔
502
        if not _launch_server(argv_list[1:]):
1✔
503
            print("Gradio server mode is unavailable.", file=sys.stderr)
1✔
504
            sys.exit(1)
1✔
505
        return
1✔
506

507
    if not argv_list:
1✔
508
        if _launch_gui(argv_list):
1✔
509
            return
1✔
510

511
        parser = _build_parser()
×
512
        parser.print_help()
×
513
        return
×
514

515
    parser = _build_parser()
1✔
516
    parsed_args = parser.parse_args(argv_list)
1✔
517

518
    host_value = getattr(parsed_args, "host", None)
1✔
519
    if host_value:
1✔
520
        parsed_args.server_url = f"http://{host_value}:9005"
×
521

522
    send_video = None
1✔
523
    remote_error_message: Optional[str] = None
1✔
524
    try:  # pragma: no cover - optional dependency guard
525
        from . import service_client
NEW
526
    except ImportError as exc:
×
NEW
527
        remote_error_message = (
×
528
            "Server mode requires the gradio_client dependency. " f"({exc})"
529
        )
530
    else:
531
        send_video = service_client.send_video
1✔
532

533
    application = CliApplication(
1✔
534
        gather_files=gather_input_files,
535
        send_video=send_video,
536
        speed_up=speed_up_video,
537
        reporter_factory=TqdmProgressReporter,
538
        remote_error_message=remote_error_message,
539
    )
540

541
    exit_code, error_messages = application.run(parsed_args)
1✔
542
    for message in error_messages:
1✔
NEW
543
        print(message, file=sys.stderr)
×
544
    if exit_code:
1✔
NEW
545
        sys.exit(exit_code)
×
546

547

548
if __name__ == "__main__":
1✔
549
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc