• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

popstas / talks-reducer / 18680948735

21 Oct 2025 10:30AM UTC coverage: 71.481% (+0.06%) from 71.425%
18680948735

Pull #130

github

web-flow
Merge 3ce70ca06 into e97cec030
Pull Request #130: feat: Add silence speed presets to GUI

63 of 66 new or added lines in 4 files covered. (95.45%)

313 existing lines in 5 files now uncovered.

5484 of 7672 relevant lines covered (71.48%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.83
/talks_reducer/gui/app.py
1
"""Tkinter GUI application for the talks reducer pipeline."""
2

3
from __future__ import annotations
1✔
4

5
import os
1✔
6
import re
1✔
7
import subprocess
1✔
8
import sys
1✔
9
import threading
1✔
10
import time
1✔
11
from pathlib import Path
1✔
12
from typing import (
1✔
13
    TYPE_CHECKING,
14
    Any,
15
    Callable,
16
    Iterable,
17
    List,
18
    Optional,
19
    Sequence,
20
    Tuple,
21
)
22

23
from . import hi_dpi  # should be imported before tkinter
1✔
24

25
if TYPE_CHECKING:
26
    import tkinter as tk
27
    from tkinter import filedialog, messagebox, ttk
28

29
try:
1✔
30
    from ..cli import gather_input_files
1✔
31
    from ..ffmpeg import FFmpegNotFoundError, is_global_ffmpeg_available
1✔
32
    from ..models import ProcessingOptions
1✔
33
    from ..pipeline import (
1✔
34
        ProcessingAborted,
35
        _input_to_output_filename,
36
        speed_up_video,
37
    )
38
    from ..progress import ProgressHandle
1✔
39
    from ..version_utils import resolve_version
1✔
40
    from . import discovery as discovery_helpers
1✔
41
    from . import layout as layout_helpers
1✔
42
    from .preferences import GUIPreferences, determine_config_path
1✔
43
    from .progress import _TkProgressReporter
1✔
44
    from .remote import (
1✔
45
        check_remote_server_for_gui,
46
        format_server_host,
47
        normalize_server_url,
48
        ping_server,
49
        process_files_via_server,
50
    )
51
    from .theme import (
1✔
52
        DARK_THEME,
53
        LIGHT_THEME,
54
        STATUS_COLORS,
55
        apply_theme,
56
        detect_system_theme,
57
        read_windows_theme_registry,
58
        run_defaults_command,
59
    )
60
except ImportError:  # pragma: no cover - handled at runtime
61
    if __package__ not in (None, ""):
62
        raise
63

64
    PACKAGE_ROOT = Path(__file__).resolve().parent.parent
65
    if str(PACKAGE_ROOT) not in sys.path:
66
        sys.path.insert(0, str(PACKAGE_ROOT))
67

68
    from talks_reducer.cli import gather_input_files
69
    from talks_reducer.ffmpeg import FFmpegNotFoundError, is_global_ffmpeg_available
70
    from talks_reducer.gui import discovery as discovery_helpers
71
    from talks_reducer.gui import layout as layout_helpers
72
    from talks_reducer.gui.preferences import GUIPreferences, determine_config_path
73
    from talks_reducer.gui.progress import _TkProgressReporter
74
    from talks_reducer.gui.remote import (
75
        check_remote_server_for_gui,
76
        format_server_host,
77
        normalize_server_url,
78
        ping_server,
79
        process_files_via_server,
80
    )
81
    from talks_reducer.gui.theme import (
82
        DARK_THEME,
83
        LIGHT_THEME,
84
        STATUS_COLORS,
85
        apply_theme,
86
        detect_system_theme,
87
        read_windows_theme_registry,
88
        run_defaults_command,
89
    )
90
    from talks_reducer.models import ProcessingOptions
91
    from talks_reducer.pipeline import (
92
        ProcessingAborted,
93
        _input_to_output_filename,
94
        speed_up_video,
95
    )
96
    from talks_reducer.progress import ProgressHandle
97
    from talks_reducer.version_utils import resolve_version
98

99
try:
1✔
100
    from tkinterdnd2 import DND_FILES, TkinterDnD
1✔
101
except ModuleNotFoundError:  # pragma: no cover - runtime dependency
102
    DND_FILES = None  # type: ignore[assignment]
103
    TkinterDnD = None  # type: ignore[assignment]
104

105

106
def _default_remote_destination(
1✔
107
    input_file: Path,
108
    *,
109
    small: bool,
110
    small_480: bool = False,
111
    add_codec_suffix: bool = False,
112
    video_codec: str = "hevc",
113
    silent_speed: float | None = None,
114
    sounded_speed: float | None = None,
115
) -> Path:
116
    """Return the default remote output path for *input_file*."""
117

118
    normalized_codec = str(video_codec or "hevc").strip().lower()
1✔
119
    target_height = 480 if small_480 else None
1✔
120

121
    return _input_to_output_filename(
1✔
122
        input_file,
123
        small,
124
        target_height,
125
        video_codec=normalized_codec,
126
        add_codec_suffix=add_codec_suffix,
127
        silent_speed=silent_speed,
128
        sounded_speed=sounded_speed,
129
    )
130

131

132
def _parse_ratios_from_summary(summary: str) -> Tuple[Optional[float], Optional[float]]:
1✔
133
    """Extract time and size ratios from a Markdown *summary* string."""
134

135
    time_ratio: Optional[float] = None
1✔
136
    size_ratio: Optional[float] = None
1✔
137

138
    for line in summary.splitlines():
1✔
139
        if "**Duration:**" in line:
1✔
140
            match = re.search(r"—\s*([0-9]+(?:\.[0-9]+)?)% of the original", line)
1✔
141
            if match:
1✔
142
                try:
1✔
143
                    time_ratio = float(match.group(1)) / 100
1✔
144
                except ValueError:
×
145
                    time_ratio = None
×
146
        elif "**Size:**" in line:
1✔
147
            match = re.search(r"\*\*Size:\*\*\s*([0-9]+(?:\.[0-9]+)?)%", line)
1✔
148
            if match:
1✔
149
                try:
1✔
150
                    size_ratio = float(match.group(1)) / 100
1✔
151
                except ValueError:
×
152
                    size_ratio = None
×
153

154
    return time_ratio, size_ratio
1✔
155

156

157
def _parse_source_duration_seconds(message: str) -> tuple[bool, Optional[float]]:
1✔
158
    """Return whether *message* includes source duration metadata."""
159

160
    metadata_match = re.search(
1✔
161
        r"source metadata: duration:\s*([\d.]+)s",
162
        message,
163
        re.IGNORECASE,
164
    )
165
    if not metadata_match:
1✔
166
        return False, None
×
167

168
    try:
1✔
169
        return True, float(metadata_match.group(1))
1✔
170
    except ValueError:
1✔
171
        return True, None
1✔
172

173

174
def _parse_encode_total_frames(message: str) -> tuple[bool, Optional[int]]:
1✔
175
    """Extract final encode frame totals from *message* when present."""
176

177
    frame_total_match = re.search(
1✔
178
        r"Final encode target frames(?: \(fallback\))?:\s*(\d+)", message
179
    )
180
    if not frame_total_match:
1✔
181
        return False, None
1✔
182

183
    try:
1✔
184
        return True, int(frame_total_match.group(1))
1✔
185
    except ValueError:
×
186
        return True, None
×
187

188

189
def _is_encode_total_frames_unknown(normalized_message: str) -> bool:
1✔
190
    """Return ``True`` if *normalized_message* marks encode frame totals unknown."""
191

192
    return (
1✔
193
        "final encode target frames" in normalized_message
194
        and "unknown" in normalized_message
195
    )
196

197

198
def _parse_current_frame(message: str) -> tuple[bool, Optional[int]]:
1✔
199
    """Extract the current encode frame from *message* when available."""
200

201
    frame_match = re.search(r"frame=\s*(\d+)", message)
1✔
202
    if not frame_match:
1✔
203
        return False, None
1✔
204

205
    try:
1✔
206
        return True, int(frame_match.group(1))
1✔
207
    except ValueError:
×
208
        return True, None
×
209

210

211
def _parse_encode_target_duration(message: str) -> tuple[bool, Optional[float]]:
1✔
212
    """Extract encode target duration from *message* if reported."""
213

214
    encode_duration_match = re.search(
1✔
215
        r"Final encode target duration(?: \(fallback\))?:\s*([\d.]+)s",
216
        message,
217
    )
218
    if not encode_duration_match:
1✔
219
        return False, None
1✔
220

221
    try:
1✔
222
        return True, float(encode_duration_match.group(1))
1✔
223
    except ValueError:
1✔
224
        return True, None
1✔
225

226

227
def _is_encode_target_duration_unknown(normalized_message: str) -> bool:
1✔
228
    """Return ``True`` if encode target duration is reported as unknown."""
229

230
    return (
1✔
231
        "final encode target duration" in normalized_message
232
        and "unknown" in normalized_message
233
    )
234

235

236
def _parse_video_duration_seconds(message: str) -> tuple[bool, Optional[float]]:
1✔
237
    """Parse the input video duration from *message* when FFmpeg prints it."""
238

239
    duration_match = re.search(r"Duration:\s*(\d{2}):(\d{2}):(\d{2}\.\d+)", message)
1✔
240
    if not duration_match:
1✔
241
        return False, None
×
242

243
    try:
1✔
244
        hours = int(duration_match.group(1))
1✔
245
        minutes = int(duration_match.group(2))
1✔
246
        seconds = float(duration_match.group(3))
1✔
247
    except ValueError:
×
248
        return True, None
×
249

250
    total_seconds = hours * 3600 + minutes * 60 + seconds
1✔
251
    return True, total_seconds
1✔
252

253

254
def _parse_ffmpeg_progress(message: str) -> tuple[bool, Optional[tuple[int, str]]]:
1✔
255
    """Parse FFmpeg progress information from *message* if available."""
256

257
    time_match = re.search(r"time=(\d{2}):(\d{2}):(\d{2})\.\d+", message)
1✔
258
    speed_match = re.search(r"speed=\s*([\d.]+)x", message)
1✔
259

260
    if not (time_match and speed_match):
1✔
261
        return False, None
×
262

263
    try:
1✔
264
        hours = int(time_match.group(1))
1✔
265
        minutes = int(time_match.group(2))
1✔
266
        seconds = int(time_match.group(3))
1✔
267
    except ValueError:
×
268
        return True, None
×
269

270
    current_seconds = hours * 3600 + minutes * 60 + seconds
1✔
271
    speed_str = speed_match.group(1)
1✔
272
    return True, (current_seconds, speed_str)
1✔
273

274

275
class TalksReducerGUI:
1✔
276
    """Tkinter application mirroring the CLI options with form controls."""
277

278
    PADDING = 10
1✔
279
    AUDIO_PROCESSING_RATIO = 0.02
1✔
280
    AUDIO_PROGRESS_STEPS = 100
1✔
281
    AUDIO_PROGRESS_WEIGHT = 5.0
1✔
282
    MIN_AUDIO_INTERVAL_MS = 10
1✔
283
    DEFAULT_AUDIO_INTERVAL_MS = 200
1✔
284

285
    def __init__(
1✔
286
        self,
287
        initial_inputs: Optional[Sequence[str]] = None,
288
        *,
289
        auto_run: bool = False,
290
    ) -> None:
291
        self._config_path = determine_config_path()
×
292
        self.preferences = GUIPreferences(self._config_path)
×
293

294
        # Import tkinter here to avoid loading it at module import time
295
        import tkinter as tk
×
296
        from tkinter import filedialog, messagebox, ttk
×
297

298
        # Store references for use in methods
299
        self.tk = tk
×
300
        self.filedialog = filedialog
×
301
        self.messagebox = messagebox
×
302
        self.ttk = ttk
×
303

304
        if TkinterDnD is not None:
×
305
            self.root = TkinterDnD.Tk()  # type: ignore[call-arg]
×
306
        else:
307
            self.root = tk.Tk()
×
308

309
        # Set window title with version information
310
        app_version = resolve_version()
×
311
        if app_version and app_version != "unknown":
×
312
            self.root.title(f"Talks Reducer v{app_version}")
×
313
        else:
314
            self.root.title("Talks Reducer")
×
315

316
        self._apply_window_icon()
×
317

318
        self._full_size = (1200, 900)
×
319
        self._simple_size = (363, 270)
×
320
        # self.root.geometry(f"{self._full_size[0]}x{self._full_size[1]}")
321
        self.style = self.ttk.Style(self.root)
×
322

323
        self._processing_thread: Optional[threading.Thread] = None
×
324
        self._last_output: Optional[Path] = None
×
325
        self._last_time_ratio: Optional[float] = None
×
326
        self._last_size_ratio: Optional[float] = None
×
327
        self._last_progress_seconds: Optional[int] = None
×
328
        self._run_start_time: Optional[float] = None
×
329
        self._status_state = "Idle"
×
330
        self.status_var = tk.StringVar(value=self._status_state)
×
331
        self._status_animation_job: Optional[str] = None
×
332
        self._status_animation_phase = 0
×
333
        self._video_duration_seconds: Optional[float] = None
×
334
        self._encode_target_duration_seconds: Optional[float] = None
×
335
        self._encode_total_frames: Optional[int] = None
×
336
        self._encode_current_frame: Optional[int] = None
×
337
        self._source_duration_seconds: Optional[float] = None
×
338
        self._audio_progress_job: Optional[str] = None
×
339
        self._audio_progress_interval_ms: Optional[int] = None
×
340
        self._audio_progress_steps_completed = 0
×
341
        self.progress_var = tk.DoubleVar(value=0.0)
×
342
        self._ffmpeg_process: Optional[subprocess.Popen] = None
×
343
        self._stop_requested = False
×
344
        self._ping_worker_stop_requested = False
×
345
        self._current_remote_mode = False
×
346

347
        self.input_files: List[str] = []
×
348

349
        self._dnd_available = TkinterDnD is not None and DND_FILES is not None
×
350

351
        self.simple_mode_var = tk.BooleanVar(
×
352
            value=self.preferences.get("simple_mode", True)
353
        )
354
        self.run_after_drop_var = tk.BooleanVar(value=True)
×
355
        self.small_var = tk.BooleanVar(value=self.preferences.get("small_video", True))
×
356
        self.small_480_var = tk.BooleanVar(
×
357
            value=self.preferences.get("small_video_480", False)
358
        )
359
        self.open_after_convert_var = tk.BooleanVar(
×
360
            value=self.preferences.get("open_after_convert", True)
361
        )
362
        stored_codec = str(self.preferences.get("video_codec", "hevc")).lower()
×
363
        if stored_codec not in {"h264", "hevc", "av1"}:
×
364
            stored_codec = "hevc"
×
365
            self.preferences.update("video_codec", stored_codec)
×
366
        prefer_global = bool(self.preferences.get("use_global_ffmpeg", False))
×
367
        self.global_ffmpeg_available = is_global_ffmpeg_available()
×
368
        if prefer_global and not self.global_ffmpeg_available:
×
369
            prefer_global = False
×
370
            self.preferences.update("use_global_ffmpeg", False)
×
371
        self.video_codec_var = tk.StringVar(value=stored_codec)
×
372
        self.add_codec_suffix_var = tk.BooleanVar(
×
373
            value=bool(self.preferences.get("add_codec_suffix", False))
374
        )
375
        self.use_global_ffmpeg_var = tk.BooleanVar(value=prefer_global)
×
376
        stored_mode = str(self.preferences.get("processing_mode", "local"))
×
377
        if stored_mode not in {"local", "remote"}:
×
378
            stored_mode = "local"
×
379
        self.processing_mode_var = tk.StringVar(value=stored_mode)
×
380
        self.processing_mode_var.trace_add("write", self._on_processing_mode_change)
×
381
        self.theme_var = tk.StringVar(value=self.preferences.get("theme", "os"))
×
382
        self.theme_var.trace_add("write", self._on_theme_change)
×
383
        self.small_var.trace_add("write", self._on_small_video_change)
×
384
        self.small_480_var.trace_add("write", self._on_small_480_change)
×
385
        self.open_after_convert_var.trace_add(
×
386
            "write", self._on_open_after_convert_change
387
        )
388
        self.video_codec_var.trace_add("write", self._on_video_codec_change)
×
389
        self.add_codec_suffix_var.trace_add("write", self._on_add_codec_suffix_change)
×
390
        self.use_global_ffmpeg_var.trace_add("write", self._on_use_global_ffmpeg_change)
×
391
        self.server_url_var = tk.StringVar(
×
392
            value=str(self.preferences.get("server_url", ""))
393
        )
394
        self.server_url_var.trace_add("write", self._on_server_url_change)
×
395
        self._discovery_thread: Optional[threading.Thread] = None
×
396

397
        self._basic_defaults: dict[str, float] = {}
×
398
        self._basic_variables: dict[str, tk.DoubleVar] = {}
×
399
        self._slider_updaters: dict[str, Callable[[str], None]] = {}
×
400
        self._sliders: list[tk.Scale] = []
×
401

402
        self._build_layout()
×
403
        self._update_small_variant_state()
×
404
        self._apply_simple_mode(initial=True)
×
405
        self._apply_status_style(self._status_state)
×
406
        self._refresh_theme()
×
407
        self.preferences.save()
×
408
        self._hide_stop_button()
×
409

410
        # Ping server on startup if in remote mode
411
        if (
×
412
            self.processing_mode_var.get() == "remote"
413
            and self.server_url_var.get().strip()
414
        ):
415
            server_url = self.server_url_var.get().strip()
×
416

417
            def ping_worker() -> None:
×
418
                try:
×
419
                    self._check_remote_server(
×
420
                        server_url,
421
                        success_status="Idle",
422
                        waiting_status="Error",
423
                        failure_status="Error",
424
                        stop_check=lambda: self._ping_worker_stop_requested,
425
                        switch_to_local_on_failure=True,
426
                    )
427
                except Exception as exc:  # pragma: no cover - defensive safeguard
428
                    host_label = self._format_server_host(server_url)
429
                    message = f"Error pinging server {host_label}: {exc}"
430
                    self._schedule_on_ui_thread(
431
                        lambda msg=message: self._append_log(msg)
432
                    )
433
                    self._schedule_on_ui_thread(
434
                        lambda msg=message: self._set_status("Idle", msg)
435
                    )
436

437
            threading.Thread(target=ping_worker, daemon=True).start()
×
438

439
        if not self._dnd_available:
×
440
            self._append_log(
×
441
                "Drag and drop requires the tkinterdnd2 package. Install it to enable the drop zone."
442
            )
443

444
        if initial_inputs:
×
445
            self._populate_initial_inputs(initial_inputs, auto_run=auto_run)
×
446

447
    def _start_run(self) -> None:
1✔
448
        if self._processing_thread and self._processing_thread.is_alive():
×
449
            self.messagebox.showinfo("Processing", "A job is already running.")
×
450
            return
×
451

452
        if not self.input_files:
×
453
            self.messagebox.showwarning(
×
454
                "Missing input", "Please add at least one file or folder."
455
            )
456
            return
×
457

458
        try:
×
459
            args = self._collect_arguments()
×
460
        except ValueError as exc:
×
461
            self.messagebox.showerror("Invalid value", str(exc))
×
462
            return
×
463

464
        self._append_log("Starting processing…")
×
465
        self._stop_requested = False
×
466
        self.stop_button.configure(text="Stop")
×
467
        self._run_start_time = time.monotonic()
×
468
        self._ping_worker_stop_requested = True
×
469
        open_after_convert = bool(self.open_after_convert_var.get())
×
470
        server_url = self.server_url_var.get().strip()
×
471
        remote_mode = self.processing_mode_var.get() == "remote"
×
472
        if remote_mode and not server_url:
×
473
            self.messagebox.showerror(
×
474
                "Missing server URL", "Remote mode requires a server URL."
475
            )
476
        remote_mode = remote_mode and bool(server_url)
×
477

478
        # Store remote_mode for use after thread starts
479
        self._current_remote_mode = remote_mode
×
480

481
        def worker() -> None:
×
482
            def set_process(proc: subprocess.Popen) -> None:
×
483
                self._ffmpeg_process = proc
×
484

485
            try:
×
486
                files = gather_input_files(self.input_files)
×
487
                if not files:
×
488
                    self._schedule_on_ui_thread(
×
489
                        lambda: self.messagebox.showwarning(
490
                            "No files", "No supported media files were found."
491
                        )
492
                    )
493
                    self._set_status("Idle")
×
494
                    return
×
495

496
                if self._current_remote_mode:
×
497
                    success = self._process_files_via_server(
×
498
                        files,
499
                        args,
500
                        server_url,
501
                        open_after_convert=open_after_convert,
502
                    )
503
                    if success:
×
504
                        self._schedule_on_ui_thread(self._hide_stop_button)
×
505
                        return
×
506
                    # If server processing failed, fall back to local processing
507
                    # The _process_files_via_server function already switched to local mode
508
                    # Update remote_mode variable to reflect the change
509
                    self._current_remote_mode = False
×
510

511
                reporter = _TkProgressReporter(
×
512
                    self._append_log,
513
                    process_callback=set_process,
514
                    stop_callback=lambda: self._stop_requested,
515
                )
516
                for index, file in enumerate(files, start=1):
×
517
                    self._append_log(f"Processing: {os.path.basename(file)}")
×
518
                    options = self._create_processing_options(Path(file), args)
×
519
                    result = speed_up_video(options, reporter=reporter)
×
520
                    self._last_output = result.output_file
×
521
                    self._last_time_ratio = result.time_ratio
×
522
                    self._last_size_ratio = result.size_ratio
×
523

524
                    # Create completion message with ratios if available
525
                    completion_msg = f"Completed: {result.output_file}"
×
526
                    if result.time_ratio is not None and result.size_ratio is not None:
×
527
                        completion_msg += f" (Time: {result.time_ratio:.2%}, Size: {result.size_ratio:.2%})"
×
528

529
                    self._append_log(completion_msg)
×
530
                    if open_after_convert:
×
531
                        self._schedule_on_ui_thread(
×
532
                            lambda path=result.output_file: self._open_in_file_manager(
533
                                path
534
                            )
535
                        )
536

537
                self._append_log("All jobs finished successfully.")
×
538
                self._schedule_on_ui_thread(
×
539
                    lambda: self.open_button.configure(state=self.tk.NORMAL)
540
                )
541
                self._schedule_on_ui_thread(self._clear_input_files)
×
542
            except FFmpegNotFoundError as exc:
×
543
                self._schedule_on_ui_thread(
×
544
                    lambda: self.messagebox.showerror("FFmpeg not found", str(exc))
545
                )
546
                self._set_status("Error")
×
547
            except ProcessingAborted:
×
548
                self._append_log("Processing aborted by user.")
×
549
                self._set_status("Aborted")
×
550
            except Exception as exc:  # pragma: no cover - GUI level safeguard
551
                # If stop was requested, don't show error (FFmpeg termination is expected)
552
                if self._stop_requested:
553
                    self._append_log("Processing aborted by user.")
554
                    self._set_status("Aborted")
555
                else:
556
                    error_msg = f"Processing failed: {exc}"
557
                    self._append_log(error_msg)
558
                    print(error_msg, file=sys.stderr)  # Also output to console
559
                    self._schedule_on_ui_thread(
560
                        lambda: self.messagebox.showerror("Error", error_msg)
561
                    )
562
                    self._set_status("Error")
563
            finally:
564
                self._run_start_time = None
×
565
                self._schedule_on_ui_thread(self._hide_stop_button)
×
566

567
        self._processing_thread = threading.Thread(target=worker, daemon=True)
×
568
        self._processing_thread.start()
×
569

570
        # Show Stop button when processing starts regardless of mode
571
        self.stop_button.grid()
×
572

573
    # ------------------------------------------------------------------ UI --
574
    def _apply_window_icon(self) -> None:
1✔
575
        layout_helpers.apply_window_icon(self)
×
576

577
    def _build_layout(self) -> None:
1✔
578
        layout_helpers.build_layout(self)
×
579

580
    def _update_basic_reset_state(self) -> None:
1✔
581
        layout_helpers.update_basic_reset_state(self)
×
582

583
    def _reset_basic_defaults(self) -> None:
1✔
584
        layout_helpers.reset_basic_defaults(self)
×
585

586
    def _apply_basic_preset(self, preset: str) -> None:
1✔
NEW
587
        layout_helpers.apply_basic_preset(self, preset)
×
588

589
    def _update_processing_mode_state(self) -> None:
1✔
590
        has_url = bool(self.server_url_var.get().strip())
×
591
        if not has_url and self.processing_mode_var.get() == "remote":
×
592
            self.processing_mode_var.set("local")
×
593
            return
×
594

595
        if hasattr(self, "remote_mode_button"):
×
596
            state = self.tk.NORMAL if has_url else self.tk.DISABLED
×
597
            self.remote_mode_button.configure(state=state)
×
598

599
    def _normalize_server_url(self, server_url: str) -> str:
1✔
600
        return normalize_server_url(server_url)
×
601

602
    def _format_server_host(self, server_url: str) -> str:
1✔
603
        return format_server_host(server_url)
×
604

605
    def _check_remote_server(
1✔
606
        self,
607
        server_url: str,
608
        *,
609
        success_status: str,
610
        waiting_status: str,
611
        failure_status: str,
612
        success_message: Optional[str] = None,
613
        waiting_message_template: str = "Waiting server {host} (attempt {attempt}/{max_attempts})",
614
        failure_message: Optional[str] = None,
615
        stop_check: Optional[Callable[[], bool]] = None,
616
        on_stop: Optional[Callable[[], None]] = None,
617
        switch_to_local_on_failure: bool = False,
618
        alert_on_failure: bool = False,
619
        warning_title: str = "Server unavailable",
620
        warning_message: Optional[str] = None,
621
        max_attempts: int = 5,
622
        delay: float = 1.0,
623
    ) -> bool:
624
        return check_remote_server_for_gui(
×
625
            self,
626
            server_url,
627
            success_status=success_status,
628
            waiting_status=waiting_status,
629
            failure_status=failure_status,
630
            success_message=success_message,
631
            waiting_message_template=waiting_message_template,
632
            failure_message=failure_message,
633
            stop_check=stop_check,
634
            on_stop=on_stop,
635
            switch_to_local_on_failure=switch_to_local_on_failure,
636
            alert_on_failure=alert_on_failure,
637
            warning_title=warning_title,
638
            warning_message=warning_message,
639
            max_attempts=max_attempts,
640
            delay=delay,
641
        )
642

643
    def _ping_server(self, server_url: str, *, timeout: float = 5.0) -> bool:
1✔
644
        return ping_server(server_url, timeout=timeout)
×
645

646
    def _start_discovery(self) -> None:
1✔
647
        discovery_helpers.start_discovery(self)
×
648

649
    def _on_discovery_failed(self, exc: Exception) -> None:
1✔
650
        discovery_helpers.on_discovery_failed(self, exc)
×
651

652
    def _on_discovery_progress(self, current: int, total: int) -> None:
1✔
653
        discovery_helpers.on_discovery_progress(self, current, total)
×
654

655
    def _on_discovery_complete(self, urls: List[str]) -> None:
1✔
656
        discovery_helpers.on_discovery_complete(self, urls)
×
657

658
    def _show_discovery_results(self, urls: List[str]) -> None:
1✔
659
        discovery_helpers.show_discovery_results(self, urls)
×
660

661
    def _toggle_simple_mode(self) -> None:
1✔
662
        self.preferences.update("simple_mode", self.simple_mode_var.get())
×
663
        self._apply_simple_mode()
×
664

665
    def _apply_simple_mode(self, *, initial: bool = False) -> None:
1✔
666
        layout_helpers.apply_simple_mode(self, initial=initial)
×
667

668
    def _apply_window_size(self, *, simple: bool) -> None:
1✔
669
        layout_helpers.apply_window_size(self, simple=simple)
×
670

671
    def _toggle_advanced(self, *, initial: bool = False) -> None:
1✔
672
        if not initial:
×
673
            self.advanced_visible.set(not self.advanced_visible.get())
×
674
        visible = self.advanced_visible.get()
×
675
        if visible:
×
676
            self.advanced_frame.grid()
×
677
            self.advanced_button.configure(text="Hide advanced")
×
678
        else:
679
            self.advanced_frame.grid_remove()
×
680
            self.advanced_button.configure(text="Advanced")
×
681

682
    def _on_theme_change(self, *_: object) -> None:
1✔
683
        self.preferences.update("theme", self.theme_var.get())
×
684
        self._refresh_theme()
×
685

686
    def _on_small_video_change(self, *_: object) -> None:
1✔
687
        self.preferences.update("small_video", bool(self.small_var.get()))
×
688
        self._update_small_variant_state()
×
689

690
    def _on_small_480_change(self, *_: object) -> None:
1✔
691
        self.preferences.update("small_video_480", bool(self.small_480_var.get()))
×
692

693
    def _update_small_variant_state(self) -> None:
1✔
694
        if not hasattr(self, "small_480_check"):
×
695
            return
×
696
        state = self.tk.NORMAL if self.small_var.get() else self.tk.DISABLED
×
697
        self.small_480_check.configure(state=state)
×
698

699
    def _on_open_after_convert_change(self, *_: object) -> None:
1✔
700
        self.preferences.update(
×
701
            "open_after_convert", bool(self.open_after_convert_var.get())
702
        )
703

704
    def _on_video_codec_change(self, *_: object) -> None:
1✔
705
        value = self.video_codec_var.get().strip().lower()
×
706
        if value not in {"h264", "hevc", "av1"}:
×
707
            value = "hevc"
×
708
            self.video_codec_var.set(value)
×
709
        self.preferences.update("video_codec", value)
×
710

711
    def _on_add_codec_suffix_change(self, *_: object) -> None:
1✔
712
        self.preferences.update(
×
713
            "add_codec_suffix", bool(self.add_codec_suffix_var.get())
714
        )
715

716
    def _on_use_global_ffmpeg_change(self, *_: object) -> None:
1✔
717
        self.preferences.update(
×
718
            "use_global_ffmpeg", bool(self.use_global_ffmpeg_var.get())
719
        )
720

721
    def _on_processing_mode_change(self, *_: object) -> None:
1✔
722
        value = self.processing_mode_var.get()
×
723
        if value not in {"local", "remote"}:
×
724
            self.processing_mode_var.set("local")
×
725
            return
×
726
        self.preferences.update("processing_mode", value)
×
727
        self._update_processing_mode_state()
×
728

729
        if self.processing_mode_var.get() == "remote":
×
730
            server_url = self.server_url_var.get().strip()
×
731
            if not server_url:
×
732
                return
×
733

734
            def ping_remote_mode() -> None:
×
735
                self._check_remote_server(
×
736
                    server_url,
737
                    success_status="Idle",
738
                    waiting_status="Error",
739
                    failure_status="Error",
740
                    failure_message="Server {host} is unreachable. Switching to local mode.",
741
                    switch_to_local_on_failure=True,
742
                    alert_on_failure=True,
743
                    warning_message="Server {host} is unreachable. Switching to local mode.",
744
                )
745

746
            threading.Thread(target=ping_remote_mode, daemon=True).start()
×
747

748
    def _on_server_url_change(self, *_: object) -> None:
1✔
749
        value = self.server_url_var.get().strip()
×
750
        self.preferences.update("server_url", value)
×
751
        self._update_processing_mode_state()
×
752

753
    def _resolve_theme_mode(self) -> str:
1✔
754
        preference = self.theme_var.get().lower()
×
755
        if preference not in {"light", "dark"}:
×
756
            return detect_system_theme(
×
757
                os.environ,
758
                sys.platform,
759
                read_windows_theme_registry,
760
                run_defaults_command,
761
            )
762
        return preference
×
763

764
    def _refresh_theme(self) -> None:
1✔
765
        mode = self._resolve_theme_mode()
×
766
        palette = LIGHT_THEME if mode == "light" else DARK_THEME
×
767
        apply_theme(
×
768
            self.style,
769
            palette,
770
            {
771
                "root": self.root,
772
                "drop_zone": getattr(self, "drop_zone", None),
773
                "log_text": getattr(self, "log_text", None),
774
                "status_label": getattr(self, "status_label", None),
775
                "sliders": getattr(self, "_sliders", []),
776
                "tk": self.tk,
777
                "apply_status_style": self._apply_status_style,
778
                "status_state": self._status_state,
779
            },
780
        )
781

782
    def _configure_drop_targets(self, widget) -> None:
1✔
783
        if not self._dnd_available:
×
784
            return
×
785
        widget.drop_target_register(DND_FILES)  # type: ignore[arg-type]
×
786
        widget.dnd_bind("<<Drop>>", self._on_drop)  # type: ignore[attr-defined]
×
787

788
    def _populate_initial_inputs(
1✔
789
        self, inputs: Sequence[str], *, auto_run: bool = False
790
    ) -> None:
791
        """Seed the GUI with preselected inputs and optionally start processing."""
792

793
        normalized: list[str] = []
×
794
        for path in inputs:
×
795
            if not path:
×
796
                continue
×
797
            resolved = os.fspath(Path(path))
×
798
            if resolved not in self.input_files:
×
799
                self.input_files.append(resolved)
×
800
                normalized.append(resolved)
×
801

802
        if auto_run and normalized:
×
803
            # Kick off processing once the event loop becomes idle so the
804
            # interface has a chance to render before the work starts.
805
            self.root.after_idle(self._start_run)
×
806

807
    # -------------------------------------------------------------- actions --
808
    def _ask_for_input_files(self) -> tuple[str, ...]:
1✔
809
        """Prompt the user to select input files for processing."""
810

811
        return self.filedialog.askopenfilenames(
×
812
            title="Select input files",
813
            filetypes=[
814
                ("Video files", "*.mp4 *.mkv *.mov *.avi *.m4v"),
815
                ("All", "*.*"),
816
            ],
817
        )
818

819
    def _add_files(self) -> None:
1✔
820
        files = self._ask_for_input_files()
×
821
        self._extend_inputs(files)
×
822

823
    def _add_directory(self) -> None:
1✔
824
        directory = self.filedialog.askdirectory(title="Select input folder")
×
825
        if directory:
×
826
            self._extend_inputs([directory])
×
827

828
    def _extend_inputs(self, paths: Iterable[str], *, auto_run: bool = False) -> None:
1✔
829
        added = False
×
830
        for path in paths:
×
831
            if path and path not in self.input_files:
×
832
                self.input_files.append(path)
×
833
                added = True
×
834
        if auto_run and added and self.run_after_drop_var.get():
×
835
            self._start_run()
×
836

837
    def _clear_input_files(self) -> None:
1✔
838
        """Clear all queued input files."""
839
        self.input_files.clear()
×
840

841
    def _on_drop(self, event: object) -> None:
1✔
842
        data = getattr(event, "data", "")
×
843
        if not data:
×
844
            return
×
845
        paths = self.root.tk.splitlist(data)
×
846
        cleaned = [path.strip("{}") for path in paths]
×
847
        # Clear existing files before adding dropped files
848
        self.input_files.clear()
×
849
        self._extend_inputs(cleaned, auto_run=True)
×
850

851
    def _on_drop_zone_click(self, event: object) -> str | None:
1✔
852
        """Open a file selection dialog when the drop zone is activated."""
853

854
        files = self._ask_for_input_files()
×
855
        if not files:
×
856
            return "break"
×
857
        self._clear_input_files()
×
858
        self._extend_inputs(files, auto_run=True)
×
859
        return "break"
×
860

861
    def _browse_path(
1✔
862
        self, variable, label: str
863
    ) -> None:  # type: (tk.StringVar, str) -> None
864
        if "folder" in label.lower():
×
865
            result = self.filedialog.askdirectory()
×
866
        else:
867
            initial = variable.get() or os.getcwd()
×
868
            result = self.filedialog.asksaveasfilename(
×
869
                initialfile=os.path.basename(initial)
870
            )
871
        if result:
×
872
            variable.set(result)
×
873

874
    def _stop_processing(self) -> None:
1✔
875
        """Stop the currently running processing by terminating FFmpeg."""
876
        import signal
×
877

878
        self._stop_requested = True
×
879
        # Update button text to indicate stopping state
880
        self.stop_button.configure(text="Stopping...")
×
881
        if self._current_remote_mode:
×
882
            self._append_log("Cancelling remote job...")
×
883
        elif self._ffmpeg_process and self._ffmpeg_process.poll() is None:
×
884
            self._append_log("Stopping FFmpeg process...")
×
885
            try:
×
886
                # Send SIGTERM to FFmpeg process
887
                if sys.platform == "win32":
×
888
                    # Windows doesn't have SIGTERM, use terminate()
889
                    self._ffmpeg_process.terminate()
×
890
                else:
891
                    # Unix-like systems can use SIGTERM
892
                    self._ffmpeg_process.send_signal(signal.SIGTERM)
×
893

894
                self._append_log("FFmpeg process stopped.")
×
895
            except Exception as e:
×
896
                self._append_log(f"Error stopping process: {e}")
×
897
        else:
898
            self._append_log("No active FFmpeg process to stop.")
×
899

900
        self._hide_stop_button()
×
901

902
    def _hide_stop_button(self) -> None:
1✔
903
        """Hide Stop button."""
904
        self.stop_button.grid_remove()
×
905
        # Show drop hint when stop button is hidden and no other buttons are visible
906
        if (
×
907
            not self.open_button.winfo_viewable()
908
            and hasattr(self, "drop_hint_button")
909
            and not self.drop_hint_button.winfo_viewable()
910
        ):
911
            self.drop_hint_button.grid()
×
912

913
    def _collect_arguments(self) -> dict[str, object]:
1✔
914
        args: dict[str, object] = {}
1✔
915

916
        if self.output_var.get():
1✔
917
            args["output_file"] = Path(self.output_var.get())
×
918
        if self.temp_var.get():
1✔
919
            args["temp_folder"] = Path(self.temp_var.get())
×
920
        silent_threshold = float(self.silent_threshold_var.get())
1✔
921
        args["silent_threshold"] = round(silent_threshold, 2)
1✔
922

923
        codec_value = self.video_codec_var.get().strip().lower()
1✔
924
        if codec_value not in {"h264", "hevc", "av1"}:
1✔
925
            codec_value = "hevc"
×
926
            self.video_codec_var.set(codec_value)
×
927
        args["video_codec"] = codec_value
1✔
928
        if self.add_codec_suffix_var.get():
1✔
929
            args["add_codec_suffix"] = True
1✔
930
        args["prefer_global_ffmpeg"] = bool(self.use_global_ffmpeg_var.get())
1✔
931

932
        sounded_speed = float(self.sounded_speed_var.get())
1✔
933
        args["sounded_speed"] = round(sounded_speed, 2)
1✔
934

935
        silent_speed = float(self.silent_speed_var.get())
1✔
936
        args["silent_speed"] = round(silent_speed, 2)
1✔
937
        if self.frame_margin_var.get():
1✔
938
            args["frame_spreadage"] = int(
1✔
939
                round(self._parse_float(self.frame_margin_var.get(), "Frame margin"))
940
            )
941
        if self.sample_rate_var.get():
1✔
942
            args["sample_rate"] = int(
1✔
943
                round(self._parse_float(self.sample_rate_var.get(), "Sample rate"))
944
            )
945
        if self.keyframe_interval_var.get():
1✔
946
            interval = float(self.keyframe_interval_var.get())
1✔
947
            if interval <= 0:
1✔
948
                raise ValueError("Keyframe interval must be positive.")
×
949
            clamped_interval = float(f"{interval:.6f}")
1✔
950
            args["keyframe_interval_seconds"] = clamped_interval
1✔
951
            self.preferences.update("keyframe_interval_seconds", clamped_interval)
1✔
952
        if self.small_var.get():
1✔
953
            args["small"] = True
×
954
            if self.small_480_var.get():
×
955
                args["small_target_height"] = 480
×
956
        return args
1✔
957

958
    def _process_files_via_server(
1✔
959
        self,
960
        files: List[str],
961
        args: dict[str, object],
962
        server_url: str,
963
        *,
964
        open_after_convert: bool,
965
    ) -> bool:
966
        """Send *files* to the configured server for processing."""
967

968
        return process_files_via_server(
×
969
            self,
970
            files,
971
            args,
972
            server_url,
973
            open_after_convert=open_after_convert,
974
            default_remote_destination=_default_remote_destination,
975
            parse_summary=_parse_ratios_from_summary,
976
        )
977

978
    def _parse_float(self, value: str, label: str) -> float:
1✔
979
        try:
×
980
            return float(value)
×
981
        except ValueError as exc:  # pragma: no cover - input validation
982
            raise ValueError(f"{label} must be a number.") from exc
983

984
    def _create_processing_options(
1✔
985
        self, input_file: Path, args: dict[str, object]
986
    ) -> ProcessingOptions:
987
        options = dict(args)
×
988
        options["input_file"] = input_file
×
989

990
        if "temp_folder" in options:
×
991
            options["temp_folder"] = Path(options["temp_folder"])
×
992

993
        return ProcessingOptions(**options)
×
994

995
    def _open_last_output(self) -> None:
1✔
996
        if self._last_output is not None:
×
997
            self._open_in_file_manager(self._last_output)
×
998

999
    def _open_in_file_manager(self, path: Path) -> None:
1✔
1000
        target = Path(path)
×
1001
        if sys.platform.startswith("win"):
×
1002
            command = ["explorer", f"/select,{target}"]
×
1003
        elif sys.platform == "darwin":
×
1004
            command = ["open", "-R", os.fspath(target)]
×
1005
        else:
1006
            command = [
×
1007
                "xdg-open",
1008
                os.fspath(target.parent if target.exists() else target),
1009
            ]
1010
        try:
×
1011
            subprocess.Popen(command)
×
1012
        except OSError:
×
1013
            self._append_log(f"Could not open file manager for {target}")
×
1014

1015
    def _append_log(self, message: str) -> None:
1✔
1016
        self._update_status_from_message(message)
×
1017

1018
        def updater() -> None:
×
1019
            self.log_text.configure(state=self.tk.NORMAL)
×
1020
            self.log_text.insert(self.tk.END, message + "\n")
×
1021
            self.log_text.see(self.tk.END)
×
1022
            self.log_text.configure(state=self.tk.DISABLED)
×
1023

1024
        self.log_text.after(0, updater)
×
1025

1026
    def _update_status_from_message(self, message: str) -> None:
1✔
1027
        normalized = message.strip().lower()
×
1028

1029
        metadata_found, source_duration = _parse_source_duration_seconds(message)
×
1030
        if metadata_found:
×
1031
            self._source_duration_seconds = source_duration
×
1032

1033
        if self._handle_status_transitions(normalized):
×
1034
            return
×
1035

1036
        frame_total_found, frame_total = _parse_encode_total_frames(message)
×
1037
        if frame_total_found:
×
1038
            self._encode_total_frames = frame_total
×
1039
            return
×
1040

1041
        if _is_encode_total_frames_unknown(normalized):
×
1042
            self._encode_total_frames = None
×
1043
            return
×
1044

1045
        frame_found, current_frame = _parse_current_frame(message)
×
1046
        if frame_found:
×
1047
            if current_frame is None:
×
1048
                return
×
1049

1050
            if self._encode_current_frame == current_frame:
×
1051
                return
×
1052

1053
            self._encode_current_frame = current_frame
×
1054
            if self._encode_total_frames and self._encode_total_frames > 0:
×
1055
                self._complete_audio_phase()
×
1056
                frame_ratio = min(current_frame / self._encode_total_frames, 1.0)
×
1057
                progress_target = self.AUDIO_PROGRESS_WEIGHT + frame_ratio * (
×
1058
                    100.0 - self.AUDIO_PROGRESS_WEIGHT
1059
                )
1060
                current_value = float(self.progress_var.get())
×
1061
                percentage = min(100.0, max(current_value, progress_target))
×
1062
                self._set_progress(percentage)
×
1063
            else:
1064
                self._complete_audio_phase()
×
1065
                self._set_status("processing", f"{current_frame} frames encoded")
×
1066

1067
        duration_found, encode_duration = _parse_encode_target_duration(message)
×
1068
        if duration_found:
×
1069
            self._encode_target_duration_seconds = encode_duration
×
1070

1071
        if _is_encode_target_duration_unknown(normalized):
×
1072
            self._encode_target_duration_seconds = None
×
1073

1074
        video_duration_found, video_duration = _parse_video_duration_seconds(message)
×
1075
        if video_duration_found and video_duration is not None:
×
1076
            self._video_duration_seconds = video_duration
×
1077

1078
        progress_found, progress_info = _parse_ffmpeg_progress(message)
×
1079
        if progress_found and progress_info is not None:
×
1080
            current_seconds, speed_str = progress_info
×
1081
            time_str = self._format_progress_time(current_seconds)
×
1082

1083
            self._last_progress_seconds = current_seconds
×
1084

1085
            total_seconds = (
×
1086
                self._encode_target_duration_seconds or self._video_duration_seconds
1087
            )
1088
            if total_seconds:
×
1089
                total_str = self._format_progress_time(total_seconds)
×
1090
                time_display = f"{time_str} / {total_str}"
×
1091
            else:
1092
                time_display = time_str
×
1093

1094
            status_msg = f"{time_display}, {speed_str}x"
×
1095

1096
            if (
×
1097
                (
1098
                    not self._encode_total_frames
1099
                    or self._encode_total_frames <= 0
1100
                    or self._encode_current_frame is None
1101
                )
1102
                and total_seconds
1103
                and total_seconds > 0
1104
            ):
1105
                self._complete_audio_phase()
×
1106
                time_ratio = min(current_seconds / total_seconds, 1.0)
×
1107
                progress_target = self.AUDIO_PROGRESS_WEIGHT + time_ratio * (
×
1108
                    100.0 - self.AUDIO_PROGRESS_WEIGHT
1109
                )
1110
                current_value = float(self.progress_var.get())
×
1111
                percentage = min(100.0, max(current_value, progress_target))
×
1112
                self._set_progress(percentage)
×
1113

1114
            self._set_status("processing", status_msg)
×
1115

1116
    def _handle_status_transitions(self, normalized_message: str) -> bool:
1✔
1117
        """Handle high-level status transitions for *normalized_message*."""
1118

1119
        if "all jobs finished successfully" in normalized_message:
×
1120
            status_components: List[str] = []
×
1121
            if self._run_start_time is not None:
×
1122
                finish_time = time.monotonic()
×
1123
                runtime_seconds = max(0.0, finish_time - self._run_start_time)
×
1124
                duration_str = self._format_progress_time(runtime_seconds)
×
1125
                status_components.append(f"{duration_str}")
×
1126
            else:
1127
                finished_seconds = next(
×
1128
                    (
1129
                        value
1130
                        for value in (
1131
                            self._last_progress_seconds,
1132
                            self._encode_target_duration_seconds,
1133
                            self._video_duration_seconds,
1134
                        )
1135
                        if value is not None
1136
                    ),
1137
                    None,
1138
                )
1139

1140
                if finished_seconds is not None:
×
1141
                    duration_str = self._format_progress_time(finished_seconds)
×
1142
                    status_components.append(f"{duration_str}")
×
1143
                else:
1144
                    status_components.append("Finished")
×
1145

1146
            if self._last_time_ratio is not None and self._last_size_ratio is not None:
×
1147
                status_components.append(
×
1148
                    f"time: {self._last_time_ratio:.0%}, size: {self._last_size_ratio:.0%}"
1149
                )
1150

1151
            status_msg = ", ".join(status_components)
×
1152

1153
            self._reset_audio_progress_state(clear_source=True)
×
1154
            self._set_status("success", status_msg)
×
1155
            self._set_progress(100)
×
1156
            self._run_start_time = None
×
1157
            self._video_duration_seconds = None
×
1158
            self._encode_target_duration_seconds = None
×
1159
            self._encode_total_frames = None
×
1160
            self._encode_current_frame = None
×
1161
            self._last_progress_seconds = None
×
1162
            return True
×
1163

1164
        if normalized_message.startswith("extracting audio"):
×
1165
            self._reset_audio_progress_state(clear_source=False)
×
1166
            self._set_status("processing", "Extracting audio...")
×
1167
            self._set_progress(0)
×
1168
            self._video_duration_seconds = None
×
1169
            self._encode_target_duration_seconds = None
×
1170
            self._encode_total_frames = None
×
1171
            self._encode_current_frame = None
×
1172
            self._last_progress_seconds = None
×
1173
            self._start_audio_progress()
×
1174
            return False
×
1175

1176
        if normalized_message.startswith("uploading"):
×
1177
            self._set_status("processing", "Uploading...")
×
1178
            return False
×
1179

1180
        if normalized_message.startswith("starting processing"):
×
1181
            self._reset_audio_progress_state(clear_source=True)
×
1182
            self._set_status("processing", "Processing")
×
1183
            self._set_progress(0)
×
1184
            self._video_duration_seconds = None
×
1185
            self._encode_target_duration_seconds = None
×
1186
            self._encode_total_frames = None
×
1187
            self._encode_current_frame = None
×
1188
            self._last_progress_seconds = None
×
1189
            return False
×
1190

1191
        if normalized_message.startswith("processing"):
×
1192
            is_new_job = bool(re.match(r"processing \d+/\d+:", normalized_message))
×
1193
            should_reset = self._status_state.lower() != "processing" or is_new_job
×
1194
            if should_reset:
×
1195
                self._set_progress(0)
×
1196
                self._video_duration_seconds = None
×
1197
                self._encode_target_duration_seconds = None
×
1198
                self._encode_total_frames = None
×
1199
                self._encode_current_frame = None
×
1200
                self._last_progress_seconds = None
×
1201
            if is_new_job:
×
1202
                self._reset_audio_progress_state(clear_source=True)
×
1203
            self._set_status("processing", "Processing")
×
1204
            return False
×
1205

1206
        return False
×
1207

1208
    def _compute_audio_progress_interval(self) -> int:
1✔
1209
        duration = self._source_duration_seconds or self._video_duration_seconds
×
1210
        if duration and duration > 0:
×
1211
            audio_seconds = max(duration * self.AUDIO_PROCESSING_RATIO, 0.0)
×
1212
            interval_seconds = audio_seconds / self.AUDIO_PROGRESS_STEPS
×
1213
            interval_ms = int(round(interval_seconds * 1000))
×
1214
            return max(self.MIN_AUDIO_INTERVAL_MS, interval_ms)
×
1215
        return self.DEFAULT_AUDIO_INTERVAL_MS
×
1216

1217
    def _start_audio_progress(self) -> None:
1✔
1218
        interval_ms = self._compute_audio_progress_interval()
×
1219

1220
        def _start() -> None:
×
1221
            if self._audio_progress_job is not None:
×
1222
                self.root.after_cancel(self._audio_progress_job)
×
1223
            self._audio_progress_steps_completed = 0
×
1224
            self._audio_progress_interval_ms = interval_ms
×
1225
            self._audio_progress_job = self.root.after(
×
1226
                interval_ms, self._advance_audio_progress
1227
            )
1228

1229
        self._schedule_on_ui_thread(_start)
×
1230

1231
    def _advance_audio_progress(self) -> None:
1✔
1232
        self._audio_progress_job = None
×
1233
        if self._audio_progress_steps_completed >= self.AUDIO_PROGRESS_STEPS:
×
1234
            self._audio_progress_interval_ms = None
×
1235
            return
×
1236

1237
        self._audio_progress_steps_completed += 1
×
1238
        audio_percentage = (
×
1239
            self._audio_progress_steps_completed / self.AUDIO_PROGRESS_STEPS * 100
1240
        )
1241
        percentage = (audio_percentage / 100.0) * self.AUDIO_PROGRESS_WEIGHT
×
1242
        self._set_progress(percentage)
×
1243
        self._set_status("processing", f"Audio processing: {audio_percentage:.1f}%")
×
1244

1245
        if self._audio_progress_steps_completed < self.AUDIO_PROGRESS_STEPS:
×
1246
            interval_ms = (
×
1247
                self._audio_progress_interval_ms or self.DEFAULT_AUDIO_INTERVAL_MS
1248
            )
1249
            self._audio_progress_job = self.root.after(
×
1250
                interval_ms, self._advance_audio_progress
1251
            )
1252
        else:
1253
            self._audio_progress_interval_ms = None
×
1254

1255
    def _cancel_audio_progress(self) -> None:
1✔
1256
        if self._audio_progress_job is None:
×
1257
            self._audio_progress_interval_ms = None
×
1258
            return
×
1259

1260
        def _cancel() -> None:
×
1261
            if self._audio_progress_job is not None:
×
1262
                self.root.after_cancel(self._audio_progress_job)
×
1263
                self._audio_progress_job = None
×
1264
            self._audio_progress_interval_ms = None
×
1265

1266
        self._schedule_on_ui_thread(_cancel)
×
1267

1268
    def _reset_audio_progress_state(self, *, clear_source: bool) -> None:
1✔
1269
        if clear_source:
×
1270
            self._source_duration_seconds = None
×
1271
        self._audio_progress_steps_completed = 0
×
1272
        self._audio_progress_interval_ms = None
×
1273
        if self._audio_progress_job is not None:
×
1274
            self._cancel_audio_progress()
×
1275

1276
    def _complete_audio_phase(self) -> None:
1✔
1277
        def _complete() -> None:
×
1278
            if self._audio_progress_job is not None:
×
1279
                self.root.after_cancel(self._audio_progress_job)
×
1280
                self._audio_progress_job = None
×
1281
            self._audio_progress_interval_ms = None
×
1282
            if self._audio_progress_steps_completed < self.AUDIO_PROGRESS_STEPS:
×
1283
                self._audio_progress_steps_completed = self.AUDIO_PROGRESS_STEPS
×
1284
                current_value = float(self.progress_var.get())
×
1285
                if current_value < self.AUDIO_PROGRESS_WEIGHT:
×
1286
                    self._set_progress(self.AUDIO_PROGRESS_WEIGHT)
×
1287

1288
        self._schedule_on_ui_thread(_complete)
×
1289

1290
    def _get_status_style(self, status: str) -> str | None:
1✔
1291
        """Return the foreground color for *status* if a match is known."""
1292

1293
        color = STATUS_COLORS.get(status.lower())
1✔
1294
        if color:
1✔
1295
            return color
1✔
1296

1297
        status_lower = status.lower()
1✔
1298
        if "extracting audio" in status_lower:
1✔
1299
            return STATUS_COLORS["processing"]
1✔
1300

1301
        if re.search(
1✔
1302
            r"\d+:\d{2}(?::\d{2})?(?: / \d+:\d{2}(?::\d{2})?)?.*\d+\.?\d*x",
1303
            status,
1304
        ):
1305
            return STATUS_COLORS["processing"]
×
1306

1307
        if "time:" in status_lower and "size:" in status_lower:
1✔
1308
            # This is our new success format with ratios
1309
            return STATUS_COLORS["success"]
1✔
1310

1311
        return None
1✔
1312

1313
    def _apply_status_style(self, status: str) -> None:
1✔
1314
        color = self._get_status_style(status)
1✔
1315
        if color:
1✔
1316
            self.status_label.configure(fg=color)
1✔
1317

1318
    def _set_status(self, status: str, status_msg: str = "") -> None:
1✔
1319
        def apply() -> None:
×
1320
            self._status_state = status
×
1321
            # Use status_msg if provided, otherwise use status
1322
            display_text = status_msg if status_msg else status
×
1323
            self.status_var.set(display_text)
×
1324
            self._apply_status_style(
×
1325
                status
1326
            )  # Colors depend on status, not display text
1327
            self._set_progress_bar_style(status)
×
1328
            lowered = status.lower()
×
1329
            is_processing = lowered == "processing" or "extracting audio" in lowered
×
1330

1331
            if is_processing:
×
1332
                # Show stop button during processing
1333
                if hasattr(self, "status_frame"):
×
1334
                    self.status_frame.grid()
×
1335
                self.stop_button.grid()
×
1336
                self.drop_hint_button.grid_remove()
×
1337
            else:
1338
                self._reset_audio_progress_state(clear_source=True)
×
1339

1340
            if lowered == "success" or "time:" in lowered and "size:" in lowered:
×
1341
                if self.simple_mode_var.get() and hasattr(self, "status_frame"):
×
1342
                    self.status_frame.grid()
×
1343
                    self.stop_button.grid_remove()
×
1344
                self.drop_hint_button.grid_remove()
×
1345
                self.open_button.grid()
×
1346
                self.open_button.lift()  # Ensure open_button is above drop_hint_button
×
1347
                # print("success status")
1348
            else:
1349
                self.open_button.grid_remove()
×
1350
                # print("not success status")
1351
                if self.simple_mode_var.get() and not is_processing:
×
1352
                    self.stop_button.grid_remove()
×
1353
                    # Show drop hint when no other buttons are visible
1354
                    if hasattr(self, "drop_hint_button"):
×
1355
                        self.drop_hint_button.grid()
×
1356

1357
        self.root.after(0, apply)
×
1358

1359
    def _format_progress_time(self, total_seconds: float) -> str:
1✔
1360
        """Format a duration in seconds as h:mm:ss or m:ss for status display."""
1361

1362
        try:
1✔
1363
            rounded_seconds = max(0, int(round(total_seconds)))
1✔
1364
        except (TypeError, ValueError):
1✔
1365
            return "0:00"
1✔
1366

1367
        hours, remainder = divmod(rounded_seconds, 3600)
1✔
1368
        minutes, seconds = divmod(remainder, 60)
1✔
1369

1370
        if hours > 0:
1✔
1371
            return f"{hours}:{minutes:02d}:{seconds:02d}"
1✔
1372

1373
        total_minutes = rounded_seconds // 60
1✔
1374
        return f"{total_minutes}:{seconds:02d}"
1✔
1375

1376
    def _calculate_gradient_color(self, percentage: float, darken: float = 1.0) -> str:
1✔
1377
        """Calculate color gradient from red (0%) to green (100%).
1378

1379
        Args:
1380
            percentage: The position in the gradient (0-100)
1381
            darken: Value between 0.0 (black) and 1.0 (original brightness)
1382

1383
        Returns:
1384
            Hex color code string
1385
        """
1386
        # Clamp percentage between 0 and 100
1387
        percentage = max(0.0, min(100.0, float(percentage)))
1✔
1388
        # Clamp darken between 0.0 and 1.0
1389
        darken = max(0.0, min(1.0, darken))
1✔
1390

1391
        if percentage <= 50:
1✔
1392
            # Red to Yellow (0% to 50%)
1393
            # Red: (248, 113, 113) -> Yellow: (250, 204, 21)
1394
            ratio = percentage / 50.0
1✔
1395
            r = int((248 + (250 - 248) * ratio) * darken)
1✔
1396
            g = int((113 + (204 - 113) * ratio) * darken)
1✔
1397
            b = int((113 + (21 - 113) * ratio) * darken)
1✔
1398
        else:
1399
            # Yellow to Green (50% to 100%)
1400
            # Yellow: (250, 204, 21) -> Green: (34, 197, 94)
1401
            ratio = (percentage - 50) / 50.0
1✔
1402
            r = int((250 + (34 - 250) * ratio) * darken)
1✔
1403
            g = int((204 + (197 - 204) * ratio) * darken)
1✔
1404
            b = int((21 + (94 - 21) * ratio) * darken)
1✔
1405

1406
        # Ensure values are within 0-255 range after darkening
1407
        r = max(0, min(255, r))
1✔
1408
        g = max(0, min(255, g))
1✔
1409
        b = max(0, min(255, b))
1✔
1410

1411
        return f"#{r:02x}{g:02x}{b:02x}"
1✔
1412

1413
    def _set_progress(self, percentage: float) -> None:
1✔
1414
        """Update the progress bar value and color (thread-safe)."""
1415

1416
        def updater() -> None:
×
1417
            value = max(0.0, min(100.0, float(percentage)))
×
1418
            self.progress_var.set(value)
×
1419
            # Update color based on percentage gradient
1420
            color = self._calculate_gradient_color(value, 0.5)
×
1421
            palette = (
×
1422
                LIGHT_THEME if self._resolve_theme_mode() == "light" else DARK_THEME
1423
            )
1424
            if self.theme_var.get().lower() in {"light", "dark"}:
×
1425
                palette = (
×
1426
                    LIGHT_THEME
1427
                    if self.theme_var.get().lower() == "light"
1428
                    else DARK_THEME
1429
                )
1430

1431
            self.style.configure(
×
1432
                "Dynamic.Horizontal.TProgressbar",
1433
                background=color,
1434
                troughcolor=palette["surface"],
1435
                borderwidth=0,
1436
                thickness=20,
1437
            )
1438
            self.progress_bar.configure(style="Dynamic.Horizontal.TProgressbar")
×
1439

1440
            # Show stop button when progress < 100
1441
            if value < 100.0:
×
1442
                if hasattr(self, "status_frame"):
×
1443
                    self.status_frame.grid()
×
1444
                self.stop_button.grid()
×
1445
                self.drop_hint_button.grid_remove()
×
1446

1447
        self.root.after(0, updater)
×
1448

1449
    def _set_progress_bar_style(self, status: str) -> None:
1✔
1450
        """Update the progress bar color based on status."""
1451

1452
        def updater() -> None:
×
1453
            # Map status to progress bar style
1454
            status_lower = status.lower()
×
1455
            if status_lower == "success" or (
×
1456
                "time:" in status_lower and "size:" in status_lower
1457
            ):
1458
                style = "Success.Horizontal.TProgressbar"
×
1459
            elif status_lower == "error":
×
1460
                style = "Error.Horizontal.TProgressbar"
×
1461
            elif status_lower == "aborted":
×
1462
                style = "Aborted.Horizontal.TProgressbar"
×
1463
            elif status_lower == "idle":
×
1464
                style = "Idle.Horizontal.TProgressbar"
×
1465
            else:
1466
                # For processing states, use dynamic gradient (will be set by _set_progress)
1467
                return
×
1468

1469
            self.progress_bar.configure(style=style)
×
1470

1471
        self.root.after(0, updater)
×
1472

1473
    def _schedule_on_ui_thread(self, callback: Callable[[], None]) -> None:
1✔
1474
        self.root.after(0, callback)
×
1475

1476
    def run(self) -> None:
1✔
1477
        """Start the Tkinter event loop."""
1478

1479
        self.root.mainloop()
×
1480

1481

1482
__all__ = [
1✔
1483
    "TalksReducerGUI",
1484
    "_default_remote_destination",
1485
    "_parse_ratios_from_summary",
1486
]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc