• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kimata / my-py-lib / 23367349689

pending completion
23367349689

push

github

kimata
fix(mercari): data-testid 属性ベースの XPath セレクタに変更

メルカリのサイト構造変更に対応し、脆弱な位置ベースの XPath を
data-testid 属性ベースに変更。価格・リンク・統計情報の取得を堅牢化。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

3652 of 5885 relevant lines covered (62.06%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.24
/src/my_lib/selenium_util.py
1
#!/usr/bin/env python3
2
"""
3
Selenium を Chrome Driver を使って動かします。
4

5
Usage:
6
  selenium_util.py [-c CONFIG] [-D]
7

8
Options:
9
  -c CONFIG         : CONFIG を設定ファイルとして読み込んで実行します。
10
                      [default: tests/fixtures/config.example.yaml]
11
  -D                : デバッグモードで動作します。
12
"""
13

14
from __future__ import annotations
1✔
15

16
import datetime
1✔
17
import inspect
1✔
18
import io
1✔
19
import logging
1✔
20
import os
1✔
21
import pathlib
1✔
22
import random
1✔
23
import re
1✔
24
import shutil
1✔
25
import signal
1✔
26
import subprocess
1✔
27
import time
1✔
28
from typing import TYPE_CHECKING, Any, Self, TypeVar
1✔
29

30
import PIL.Image
1✔
31
import psutil
1✔
32
import selenium
1✔
33
import selenium.common.exceptions
1✔
34
import selenium.webdriver.chrome.options
1✔
35
import selenium.webdriver.chrome.service
1✔
36
import selenium.webdriver.common.action_chains
1✔
37
import selenium.webdriver.common.by
1✔
38
import selenium.webdriver.common.keys
1✔
39
import selenium.webdriver.support.expected_conditions
1✔
40

41
import my_lib.chrome_util
1✔
42
import my_lib.time
1✔
43

44
if TYPE_CHECKING:
45
    import types
46
    from collections.abc import Callable
47

48
    from selenium.webdriver.remote.webdriver import WebDriver
49
    from selenium.webdriver.support.wait import WebDriverWait
50

51
T = TypeVar("T")
1✔
52

53
WAIT_RETRY_COUNT: int = 1
1✔
54
LOG_MAX_BYTES: int = 10 * 1024 * 1024  # 10MB
1✔
55
LOG_BACKUP_COUNT: int = 3
1✔
56

57

58
class SeleniumError(Exception):
1✔
59
    """Selenium 関連エラーの基底クラス"""
60

61

62
def _get_rotated_path(log_file: pathlib.Path, index: int) -> pathlib.Path:
1✔
63
    """ローテーションされたログファイルのパスを生成"""
64
    return log_file.parent / f"{log_file.name}.{index}"
×
65

66

67
def _rotate_log_file(
1✔
68
    log_file: pathlib.Path,
69
    max_bytes: int = LOG_MAX_BYTES,
70
    backup_count: int = LOG_BACKUP_COUNT,
71
) -> bool:
72
    """ログファイルをローテーションする(copytruncate 方式)
73

74
    Chrome がファイルを開いたまま書き込みを続けるため、
75
    ファイルをコピーしてからトランケートする方式を採用。
76

77
    Args:
78
        log_file: ローテーション対象のログファイルパス
79
        max_bytes: ローテーションを実行するファイルサイズ閾値(バイト)
80
        backup_count: 保持するバックアップ世代数
81

82
    Returns:
83
        ローテーションを実行した場合は True
84

85
    """
86
    if not log_file.exists():
×
87
        return False
×
88

89
    try:
×
90
        file_size = log_file.stat().st_size
×
91
    except OSError:
×
92
        return False
×
93

94
    if file_size <= max_bytes:
×
95
        return False
×
96

97
    logging.info(
×
98
        "Rotating log file: %s (size: %.1f MB)",
99
        log_file.name,
100
        file_size / (1024 * 1024),
101
    )
102

103
    # 最古の世代を削除し、世代をシフト
104
    for i in range(backup_count, 0, -1):
×
105
        rotated = _get_rotated_path(log_file, i)
×
106
        if i == backup_count:
×
107
            rotated.unlink(missing_ok=True)
×
108
        elif rotated.exists():
×
109
            rotated.rename(_get_rotated_path(log_file, i + 1))
×
110

111
    # 現在のファイルを .1 にコピーしてからトランケート
112
    shutil.copy2(log_file, _get_rotated_path(log_file, 1))
×
113
    log_file.open("w").close()
×
114

115
    return True
×
116

117

118
def rotate_selenium_logs(
1✔
119
    log_path: pathlib.Path,
120
    max_bytes: int = LOG_MAX_BYTES,
121
    backup_count: int = LOG_BACKUP_COUNT,
122
) -> None:
123
    """Selenium 関連のログファイルをローテーションする
124

125
    Args:
126
        log_path: ログディレクトリのパス
127
        max_bytes: ローテーションを実行するファイルサイズ閾値(バイト、デフォルト: 10MB)
128
        backup_count: 保持するバックアップ世代数(デフォルト: 3)
129

130
    """
131
    if not log_path.exists():
×
132
        return
×
133

134
    for log_file in log_path.glob("*.log"):
×
135
        _rotate_log_file(log_file, max_bytes, backup_count)
×
136

137

138
def _get_chrome_version() -> int | None:
1✔
139
    try:
1✔
140
        result = subprocess.run(
1✔
141
            ["google-chrome", "--version"],  # noqa: S607
142
            capture_output=True,
143
            text=True,
144
            timeout=10,
145
            check=False,
146
        )
147
        match = re.search(r"(\d+)\.", result.stdout)
1✔
148
        if match:
1✔
149
            return int(match.group(1))
1✔
150
    except Exception:
1✔
151
        logging.warning("Failed to detect Chrome version")
1✔
152
    return None
1✔
153

154

155
def _create_chrome_options(
1✔
156
    profile_name: str,
157
    chrome_data_path: pathlib.Path,
158
    log_path: pathlib.Path,
159
    is_headless: bool,
160
) -> selenium.webdriver.chrome.options.Options:
161
    """Chrome オプションを作成する
162

163
    Args:
164
        profile_name: プロファイル名
165
        chrome_data_path: Chrome データディレクトリのパス
166
        log_path: ログディレクトリのパス
167
        is_headless: ヘッドレスモードで起動するか
168

169
    Returns:
170
        設定済みの Chrome オプション
171

172
    """
173
    options = selenium.webdriver.chrome.options.Options()
×
174

175
    if is_headless:
×
176
        options.add_argument("--headless=new")
×
177

178
    options.add_argument("--no-sandbox")  # for Docker
×
179
    options.add_argument("--disable-dev-shm-usage")  # for Docker
×
180
    options.add_argument("--disable-gpu")
×
181

182
    options.add_argument("--disable-popup-blocking")
×
183
    options.add_argument("--disable-plugins")
×
184

185
    options.add_argument("--no-first-run")
×
186

187
    options.add_argument("--lang=ja-JP")
×
188
    options.add_argument("--window-size=1920,1080")
×
189

190
    # NOTE: Accept-Language ヘッダーを日本語優先に設定
191
    options.add_experimental_option("prefs", {"intl.accept_languages": "ja-JP,ja,en-US,en"})
×
192

193
    options.add_argument("--user-data-dir=" + str(chrome_data_path / profile_name))
×
194

195
    options.add_argument("--enable-logging")
×
196
    options.add_argument("--v=1")
×
197

198
    chrome_log_file = log_path / f"chrome_{profile_name}.log"
×
199
    options.add_argument(f"--log-file={chrome_log_file!s}")
×
200

201
    if not is_headless:
×
202
        options.add_argument("--auto-open-devtools-for-tabs")
×
203

204
    return options
×
205

206

207
def _create_driver_impl(
1✔
208
    profile_name: str,
209
    data_path: pathlib.Path,
210
    is_headless: bool,
211
    use_subprocess: bool,
212
    use_undetected: bool,
213
    stealth_mode: bool,
214
) -> WebDriver:
215
    """WebDriver を作成する内部実装
216

217
    Args:
218
        profile_name: プロファイル名
219
        data_path: データディレクトリのパス
220
        is_headless: ヘッドレスモードで起動するか
221
        use_subprocess: サブプロセスで Chrome を起動するか(undetected_chromedriver のみ)
222
        use_undetected: undetected_chromedriver を使用するか
223
        stealth_mode: ボット検出回避のための User-Agent 偽装を行うか
224

225
    Returns:
226
        WebDriver インスタンス
227

228
    """
229
    chrome_data_path = data_path / "chrome"
×
230
    log_path = data_path / "log"
×
231

232
    # NOTE: Pytest を並列実行できるようにする
233
    suffix = os.environ.get("PYTEST_XDIST_WORKER", None)
×
234
    if suffix is not None:
×
235
        profile_name += "." + suffix
×
236

237
    chrome_data_path.mkdir(parents=True, exist_ok=True)
×
238
    log_path.mkdir(parents=True, exist_ok=True)
×
239

240
    rotate_selenium_logs(log_path)
×
241

242
    options = _create_chrome_options(profile_name, chrome_data_path, log_path, is_headless)
×
243

244
    service = selenium.webdriver.chrome.service.Service(
×
245
        service_args=["--verbose", f"--log-path={str(log_path / 'webdriver.log')!s}"],
246
    )
247

248
    if use_undetected:
×
249
        import undetected_chromedriver
×
250

251
        chrome_version = _get_chrome_version()
×
252

253
        # NOTE: user_multi_procs=True は既存の chromedriver ファイルが存在することを前提としているため、
254
        # ファイルが存在しない場合(CI環境の初回実行など)は False にする
255
        uc_data_path = pathlib.Path("~/.local/share/undetected_chromedriver").expanduser()
×
256
        use_multi_procs = uc_data_path.exists() and any(uc_data_path.glob("*chromedriver*"))
×
257

258
        driver = undetected_chromedriver.Chrome(
×
259
            service=service,
260
            options=options,
261
            use_subprocess=use_subprocess,
262
            version_main=chrome_version,
263
            user_multi_procs=use_multi_procs,
264
        )
265
    else:
266
        driver = selenium.webdriver.Chrome(
×
267
            service=service,
268
            options=options,
269
        )
270

271
    driver.set_page_load_timeout(30)
×
272

273
    # CDP を使って日本語ロケールを強制設定
274
    set_japanese_locale(driver)
×
275

276
    # ボット検出回避のための設定を適用(オプション)
277
    if stealth_mode:
×
278
        set_stealth_mode(driver)
×
279

280
    return driver
×
281

282

283
def create_driver(
1✔
284
    profile_name: str,
285
    data_path: pathlib.Path,
286
    is_headless: bool = True,
287
    clean_profile: bool = False,
288
    auto_recover: bool = True,
289
    use_undetected: bool = True,
290
    use_subprocess: bool = False,
291
    stealth_mode: bool = True,
292
) -> WebDriver:
293
    """Chrome WebDriver を作成する
294

295
    Args:
296
        profile_name: プロファイル名
297
        data_path: データディレクトリのパス
298
        is_headless: ヘッドレスモードで起動するか
299
        clean_profile: 起動前にロックファイルを削除するか
300
        auto_recover: プロファイル破損時に自動リカバリするか
301
        use_undetected: undetected_chromedriver を使用するか(デフォルト: True)
302
        use_subprocess: サブプロセスで Chrome を起動するか(undetected_chromedriver のみ)
303
        stealth_mode: ボット検出回避のための User-Agent 偽装を行うか(デフォルト: True)
304

305
    Raises:
306
        ValueError: use_undetected=False かつ use_subprocess=True の場合
307

308
    """
309
    if not use_undetected and use_subprocess:
×
310
        raise ValueError("use_subprocess=True は use_undetected=True の場合のみ有効です")
×
311

312
    # NOTE: ルートロガーの出力レベルを変更した場合でも Selenium 関係は抑制する
313
    logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
×
314
    logging.getLogger("selenium.webdriver.common.selenium_manager").setLevel(logging.WARNING)
×
315
    logging.getLogger("selenium.webdriver.remote.remote_connection").setLevel(logging.WARNING)
×
316

317
    # NOTE: chrome_util の内部関数を使用(同一パッケージ内での使用は許容)
318
    actual_profile_name = my_lib.chrome_util._get_actual_profile_name(profile_name)
×
319
    profile_path = data_path / "chrome" / actual_profile_name
×
320

321
    # プロファイル健全性チェック
322
    health = my_lib.chrome_util._check_profile_health(profile_path)
×
323
    if not health.is_healthy:
×
324
        logging.warning("Profile health check failed: %s", ", ".join(health.errors))
×
325

326
        if health.has_lock_files and not (health.has_corrupted_json or health.has_corrupted_db):
×
327
            # ロックファイルのみの問題なら削除して続行
328
            logging.info("Cleaning up lock files only")
×
329
            my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
330
        elif auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
331
            # JSON または DB が破損している場合はプロファイルをリカバリ
332
            logging.warning("Profile is corrupted, attempting recovery")
×
333
            if my_lib.chrome_util._recover_corrupted_profile(profile_path):
×
334
                logging.info("Profile recovery successful, will create new profile")
×
335
            else:
336
                logging.error("Profile recovery failed")
×
337

338
    if clean_profile:
×
339
        my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
340

341
    # NOTE: 1回だけ自動リトライ
342
    try:
×
343
        return _create_driver_impl(
×
344
            profile_name, data_path, is_headless, use_subprocess, use_undetected, stealth_mode
345
        )
346
    except Exception as e:
×
347
        logging.warning("First attempt to create driver failed: %s", e)
×
348

349
        # コンテナ内で実行中の場合のみ、残った Chrome プロセスをクリーンアップ
350
        my_lib.chrome_util._cleanup_orphaned_chrome_processes_in_container()
×
351

352
        # プロファイルのロックファイルを削除
353
        my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
354

355
        # 再度健全性チェック
356
        health = my_lib.chrome_util._check_profile_health(profile_path)
×
357
        if not health.is_healthy and auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
358
            logging.warning("Profile still corrupted after first attempt, recovering")
×
359
            my_lib.chrome_util._recover_corrupted_profile(profile_path)
×
360

361
        return _create_driver_impl(
×
362
            profile_name, data_path, is_headless, use_subprocess, use_undetected, stealth_mode
363
        )
364

365

366
def xpath_exists(driver: WebDriver, xpath: str) -> bool:
1✔
367
    return len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0
×
368

369

370
def get_text(
1✔
371
    driver: WebDriver,
372
    xpath: str,
373
    safe_text: str,
374
    wait: WebDriverWait[WebDriver] | None = None,
375
) -> str:
376
    if wait is not None:
×
377
        wait.until(
×
378
            selenium.webdriver.support.expected_conditions.presence_of_all_elements_located(
379
                (selenium.webdriver.common.by.By.XPATH, xpath)
380
            )
381
        )
382

383
    if len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0:
×
384
        return driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).text.strip()
×
385
    else:
386
        return safe_text
×
387

388

389
def input_xpath(
1✔
390
    driver: WebDriver,
391
    xpath: str,
392
    text: str,
393
    wait: WebDriverWait[WebDriver] | None = None,
394
    is_warn: bool = True,
395
) -> bool:
396
    if wait is not None:
×
397
        wait.until(
×
398
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
399
                (selenium.webdriver.common.by.By.XPATH, xpath)
400
            )
401
        )
402
        time.sleep(0.05)
×
403

404
    if xpath_exists(driver, xpath):
×
405
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).send_keys(text)
×
406
        return True
×
407
    else:
408
        if is_warn:
×
409
            logging.warning("Element is not found: %s", xpath)
×
410
        return False
×
411

412

413
def click_xpath(
1✔
414
    driver: WebDriver,
415
    xpath: str,
416
    wait: WebDriverWait[WebDriver] | None = None,
417
    is_warn: bool = True,
418
    move: bool = False,
419
) -> bool:
420
    if wait is not None:
×
421
        wait.until(
×
422
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
423
                (selenium.webdriver.common.by.By.XPATH, xpath)
424
            )
425
        )
426
        time.sleep(0.05)
×
427

428
    if xpath_exists(driver, xpath):
×
429
        elem = driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath)
×
430
        if move:
×
431
            action = selenium.webdriver.common.action_chains.ActionChains(driver)
×
432
            action.move_to_element(elem)
×
433
            action.perform()
×
434

435
        elem.click()
×
436
        return True
×
437
    else:
438
        if is_warn:
×
439
            logging.warning("Element is not found: %s", xpath)
×
440
        return False
×
441

442

443
def is_display(driver: WebDriver, xpath: str) -> bool:
1✔
444
    return (len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0) and (
×
445
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).is_displayed()
446
    )
447

448

449
def random_sleep(sec: float) -> None:
1✔
450
    RATIO = 0.8
1✔
451

452
    time.sleep((sec * RATIO) + (sec * (1 - RATIO) * 2) * random.random())  # noqa: S311
1✔
453

454

455
def with_retry(
1✔
456
    func: Callable[[], T],
457
    max_retries: int = 3,
458
    delay: float = 1.0,
459
    exceptions: tuple[type[Exception], ...] = (Exception,),
460
    on_retry: Callable[[int, Exception], bool | None] | None = None,
461
) -> T:
462
    """リトライ付きで関数を実行
463

464
    全て失敗した場合は最後の例外を再スロー。
465
    呼び出し側で try/except してエラー処理を行う。
466

467
    Args:
468
        func: 実行する関数
469
        max_retries: 最大リトライ回数
470
        delay: リトライ間の待機秒数
471
        exceptions: リトライ対象の例外タプル
472
        on_retry: リトライ時のコールバック (attempt, exception)
473
            - None または True を返すとリトライを継続
474
            - False を返すとリトライを中止して例外を再スロー
475

476
    Returns:
477
        成功時は関数の戻り値
478

479
    Raises:
480
        最後の例外を再スロー
481

482
    """
483
    last_exception: Exception | None = None
1✔
484

485
    for attempt in range(max_retries):
1✔
486
        try:
1✔
487
            return func()
1✔
488
        except exceptions as e:
1✔
489
            last_exception = e
1✔
490
            if attempt < max_retries - 1:
1✔
491
                if on_retry:
1✔
492
                    should_continue = on_retry(attempt + 1, e)
1✔
493
                    if should_continue is False:
1✔
494
                        raise
1✔
495
                time.sleep(delay)
1✔
496

497
    if last_exception:
1✔
498
        raise last_exception
1✔
499
    raise RuntimeError("Unexpected state in with_retry")
×
500

501

502
def with_session_retry(
1✔
503
    func: Callable[[], T],
504
    driver_name: str,
505
    data_dir: pathlib.Path,
506
    *,
507
    max_retries: int = 1,
508
    clear_profile_on_error: bool = True,
509
    on_retry: Callable[[int, int], None] | None = None,
510
    before_retry: Callable[[], None] | None = None,
511
) -> T:
512
    """InvalidSessionIdException 発生時にプロファイル削除してリトライ
513

514
    ブラウザのセッションが無効になった場合(クラッシュ等)、
515
    プロファイルを削除して再起動することで復旧を試みる。
516

517
    Args:
518
        func: 実行する関数
519
        driver_name: Chrome プロファイル名
520
        data_dir: Selenium データディレクトリ
521
        max_retries: 最大リトライ回数 (デフォルト: 1)
522
        clear_profile_on_error: エラー時にプロファイルを削除するか
523
        on_retry: リトライ時のコールバック (attempt, max_retries) - ステータス更新用
524
        before_retry: リトライ前のコールバック - quit_selenium() 呼び出し用
525

526
    Returns:
527
        成功時は関数の戻り値
528

529
    Raises:
530
        InvalidSessionIdException: リトライ上限に達した場合
531

532
    """
533

534
    def retry_handler(attempt: int, exception: Exception) -> bool | None:
1✔
535
        if before_retry:
1✔
536
            before_retry()
1✔
537

538
        if attempt <= max_retries and clear_profile_on_error:
1✔
539
            logging.warning(
1✔
540
                "セッションエラーが発生しました。プロファイルを削除してリトライします(%d/%d)",
541
                attempt,
542
                max_retries,
543
            )
544
            if on_retry:
1✔
545
                on_retry(attempt, max_retries)
1✔
546
            my_lib.chrome_util.delete_profile(driver_name, data_dir)
1✔
547
            return True
1✔
548
        return False
1✔
549

550
    return with_retry(
1✔
551
        func,
552
        max_retries=max_retries + 1,
553
        delay=0,
554
        exceptions=(selenium.common.exceptions.InvalidSessionIdException,),
555
        on_retry=retry_handler,
556
    )
557

558

559
def wait_patiently(
1✔
560
    driver: WebDriver,
561
    wait: WebDriverWait[WebDriver],
562
    target: Any,
563
) -> None:
564
    error: selenium.common.exceptions.TimeoutException | None = None
×
565
    for i in range(WAIT_RETRY_COUNT + 1):
×
566
        try:
×
567
            wait.until(target)
×
568
            return
×
569
        except selenium.common.exceptions.TimeoutException as e:
×
570
            logging.warning(
×
571
                "タイムアウトが発生しました。(%s in %s line %d)",
572
                inspect.stack()[1].function,
573
                inspect.stack()[1].filename,
574
                inspect.stack()[1].lineno,
575
            )
576
            error = e
×
577

578
            logging.info(i)
×
579
            if i != WAIT_RETRY_COUNT:
×
580
                logging.info("refresh")
×
581
                driver.refresh()
×
582

583
    if error is not None:
×
584
        raise error
×
585

586

587
def dump_page(
1✔
588
    driver: WebDriver,
589
    index: int,
590
    dump_path: pathlib.Path,
591
    stack_index: int = 1,
592
) -> None:
593
    name = inspect.stack()[stack_index].function.replace("<", "").replace(">", "")
×
594

595
    dump_path.mkdir(parents=True, exist_ok=True)
×
596

597
    png_path = dump_path / f"{name}_{index:02d}.png"
×
598
    htm_path = dump_path / f"{name}_{index:02d}.htm"
×
599

600
    driver.save_screenshot(str(png_path))
×
601

602
    with htm_path.open("w", encoding="utf-8") as f:
×
603
        f.write(driver.page_source)
×
604

605
    logging.info(
×
606
        "page dump: %02d from %s in %s line %d",
607
        index,
608
        inspect.stack()[stack_index].function,
609
        inspect.stack()[stack_index].filename,
610
        inspect.stack()[stack_index].lineno,
611
    )
612

613

614
def clear_cache(driver: WebDriver) -> None:
1✔
615
    driver.execute_cdp_cmd("Network.clearBrowserCache", {})
×
616

617

618
def set_japanese_locale(driver: WebDriver) -> None:
1✔
619
    """CDP を使って日本語ロケールを強制設定
620

621
    Chrome の起動オプションだけでは言語設定が変わってしまうことがあるため、
622
    CDP を使って Accept-Language ヘッダーとロケールを強制的に日本語に設定する。
623
    """
624
    try:
×
625
        # NOTE: Network.setExtraHTTPHeaders は Network.enable を先に呼ばないと機能しない
626
        driver.execute_cdp_cmd("Network.enable", {})
×
627
        driver.execute_cdp_cmd(
×
628
            "Network.setExtraHTTPHeaders",
629
            {"headers": {"Accept-Language": "ja-JP,ja;q=0.9"}},
630
        )
631
        driver.execute_cdp_cmd(
×
632
            "Emulation.setLocaleOverride",
633
            {"locale": "ja-JP"},
634
        )
635
        logging.debug("Japanese locale set via CDP")
×
636
    except Exception:
×
637
        logging.warning("Failed to set Japanese locale via CDP")
×
638

639

640
def _get_stealth_user_agent(driver: WebDriver) -> str:
1✔
641
    """ブラウザの User-Agent を取得し、ボット検出回避用に修正.
642

643
    - OS 部分を Windows に変更
644
    - HeadlessChrome を Chrome に変更
645

646
    Args:
647
        driver: WebDriver インスタンス
648

649
    Returns:
650
        修正された User-Agent 文字列
651

652
    """
653
    original_ua = driver.execute_script("return navigator.userAgent")
×
654
    logging.debug("Original User-Agent: %s", original_ua)
×
655

656
    # OS 部分を Windows に変更
657
    pattern = r"\([^)]*(?:Linux|Macintosh|X11)[^)]*\)"
×
658
    replacement = "(Windows NT 10.0; Win64; x64)"
×
659
    modified_ua = re.sub(pattern, replacement, original_ua)
×
660

661
    # HeadlessChrome を Chrome に変更
662
    modified_ua = modified_ua.replace("HeadlessChrome", "Chrome")
×
663

664
    logging.debug("Modified User-Agent: %s", modified_ua)
×
665
    return modified_ua
×
666

667

668
def set_stealth_mode(driver: WebDriver) -> None:
1✔
669
    """ボット検出回避のための CDP 設定を適用.
670

671
    ヨドバシ等のボット検出は User-Agent をチェックしているため、
672
    OS を Windows に偽装し、HeadlessChrome を Chrome に変更することで回避できる。
673

674
    Args:
675
        driver: WebDriver インスタンス
676

677
    """
678
    try:
×
679
        modified_ua = _get_stealth_user_agent(driver)
×
680
        driver.execute_cdp_cmd(
×
681
            "Network.setUserAgentOverride",
682
            {
683
                "userAgent": modified_ua,
684
                "acceptLanguage": "ja-JP,ja;q=0.9,en-US;q=0.8,en;q=0.7",
685
                "platform": "Win32",
686
            },
687
        )
688
        logging.debug("Stealth mode enabled (User-Agent override applied)")
×
689
    except Exception:
×
690
        logging.warning("Failed to enable stealth mode")
×
691

692

693
def clean_dump(dump_path: pathlib.Path, keep_days: int = 1) -> None:
1✔
694
    if not dump_path.exists():
1✔
695
        return
1✔
696

697
    time_threshold = datetime.timedelta(keep_days)
1✔
698

699
    for item in dump_path.iterdir():
1✔
700
        if not item.is_file():
1✔
701
            continue
1✔
702
        try:
1✔
703
            time_diff = datetime.datetime.now(datetime.UTC) - datetime.datetime.fromtimestamp(
1✔
704
                item.stat().st_mtime, datetime.UTC
705
            )
706
        except FileNotFoundError:
×
707
            # ファイルが別プロセスにより削除された場合(SQLiteの一時ファイルなど)
708
            continue
×
709
        if time_diff > time_threshold:
1✔
710
            logging.warning("remove %s [%s day(s) old].", item.absolute(), f"{time_diff.days:,}")
1✔
711

712
            item.unlink(missing_ok=True)
1✔
713

714

715
def get_memory_info(driver: WebDriver) -> dict[str, Any]:
1✔
716
    """ブラウザのメモリ使用量を取得(単位: KB)"""
717
    total_bytes = subprocess.Popen(  # noqa: S602
×
718
        "smem -t -c pss -P chrome | tail -n 1",  # noqa: S607
719
        shell=True,
720
        stdout=subprocess.PIPE,
721
    ).communicate()[0]
722
    total = int(str(total_bytes, "utf-8").strip())  # smem の出力は KB 単位
×
723

724
    try:
×
725
        memory_info = driver.execute_cdp_cmd("Memory.getAllTimeSamplingProfile", {})
×
726
        heap_usage = driver.execute_cdp_cmd("Runtime.getHeapUsage", {})
×
727

728
        heap_used = heap_usage.get("usedSize", 0) // 1024  # bytes → KB
×
729
        heap_total = heap_usage.get("totalSize", 0) // 1024  # bytes → KB
×
730
    except Exception as e:
×
731
        logging.debug("Failed to get memory usage: %s", e)
×
732

733
        memory_info = None
×
734
        heap_used = 0
×
735
        heap_total = 0
×
736

737
    return {
×
738
        "total": total,
739
        "heap_used": heap_used,
740
        "heap_total": heap_total,
741
        "memory_info": memory_info,
742
    }
743

744

745
def log_memory_usage(driver: WebDriver) -> None:
1✔
746
    mem_info = get_memory_info(driver)
×
747
    logging.info(
×
748
        "Chrome memory: %s MB (JS heap: %s MB)",
749
        f"""{mem_info["total"] // 1024:,}""",
750
        f"""{mem_info["heap_used"] // 1024:,}""",
751
    )
752

753

754
def _warmup(
1✔
755
    driver: WebDriver,
756
    keyword: str,
757
    url_pattern: str,
758
    sleep_sec: int = 3,
759
) -> None:
760
    # NOTE: ダミーアクセスを行って BOT ではないと思わせる。(効果なさそう...)
761
    driver.get("https://www.yahoo.co.jp/")
×
762
    time.sleep(sleep_sec)
×
763

764
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(keyword)
×
765
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(
×
766
        selenium.webdriver.common.keys.Keys.ENTER
767
    )
768

769
    time.sleep(sleep_sec)
×
770

771
    driver.find_element(
×
772
        selenium.webdriver.common.by.By.XPATH, f'//a[contains(@href, "{url_pattern}")]'
773
    ).click()
774

775
    time.sleep(sleep_sec)
×
776

777

778
class browser_tab:
1✔
779
    """新しいブラウザタブで URL を開くコンテキストマネージャ"""
780

781
    def __init__(self, driver: WebDriver, url: str) -> None:
1✔
782
        """初期化
783

784
        Args:
785
            driver: WebDriver インスタンス
786
            url: 開く URL
787

788
        """
789
        self.driver = driver
1✔
790
        self.url = url
1✔
791
        self.original_window: str | None = None
1✔
792

793
    def __enter__(self) -> None:
1✔
794
        """新しいタブを開いて URL にアクセス"""
795
        self.original_window = self.driver.current_window_handle
1✔
796
        self.driver.execute_script("window.open('');")
1✔
797
        self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
798
        try:
1✔
799
            self.driver.get(self.url)
1✔
800
        except Exception:
×
801
            # NOTE: URL読み込みに失敗した場合もクリーンアップしてから例外を再送出
802
            self._cleanup()
×
803
            raise
×
804

805
    def _cleanup(self) -> None:
1✔
806
        """タブを閉じて元のウィンドウに戻る"""
807
        try:
1✔
808
            # 余分なタブを閉じる
809
            while len(self.driver.window_handles) > 1:
1✔
810
                self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
811
                self.driver.close()
1✔
812
            if self.original_window is not None:
1✔
813
                self.driver.switch_to.window(self.original_window)
1✔
814
            time.sleep(0.1)
1✔
815
        except Exception:
1✔
816
            # NOTE: Chromeがクラッシュした場合は無視(既に終了しているため操作不可)
817
            logging.exception("タブのクリーンアップに失敗しました(Chromeがクラッシュした可能性があります)")
1✔
818

819
    def _recover_from_error(self) -> None:
1✔
820
        """エラー後にブラウザの状態を回復する"""
821
        try:
×
822
            # ページロードタイムアウトをリセット(負の値になっている可能性があるため)
823
            self.driver.set_page_load_timeout(30)
×
824

825
            # about:blank に移動してレンダラーの状態をリセット
826
            self.driver.get("about:blank")
×
827
            time.sleep(0.5)
×
828
        except Exception:
×
829
            logging.warning("ブラウザの回復に失敗しました")
×
830

831
    def __exit__(
1✔
832
        self,
833
        exception_type: type[BaseException] | None,
834
        exception_value: BaseException | None,
835
        traceback: types.TracebackType | None,
836
    ) -> None:
837
        """タブを閉じて元のウィンドウに戻る"""
838
        self._cleanup()
1✔
839

840
        # 例外が発生した場合はブラウザの状態を回復
841
        if exception_type is not None:
1✔
842
            self._recover_from_error()
×
843

844

845
class error_handler:
1✔
846
    """Selenium操作時のエラーハンドリング用コンテキストマネージャ
847

848
    エラー発生時に自動でログ出力、スクリーンショット取得、コールバック呼び出しを行う。
849

850
    Args:
851
        driver: WebDriver インスタンス
852
        message: ログに出力するエラーメッセージ
853
        on_error: エラー時に呼ばれるコールバック関数
854
            (exception, screenshot: PIL.Image.Image | None, page_source: str | None) -> None
855
        capture_screenshot: スクリーンショットを自動取得するか(デフォルト: True)
856
        reraise: 例外を再送出するか(デフォルト: True)
857

858
    Attributes:
859
        exception: 発生した例外(エラーがなければ None)
860
        screenshot: 取得したスクリーンショット(PIL.Image.Image、取得失敗時は None)
861
        page_source: 取得したページソース(取得失敗時は None)
862

863
    Examples:
864
        基本的な使用方法::
865

866
            with my_lib.selenium_util.error_handler(driver, message="ログイン処理に失敗") as handler:
867
                driver.get(login_url)
868
                driver.find_element(...).click()
869

870
        コールバック付き(Slack通知など)::
871

872
            def notify(exc, screenshot, page_source):
873
                slack.notify_error_with_page(config, "エラー発生", exc, screenshot, page_source)
874

875
            with my_lib.selenium_util.error_handler(
876
                driver,
877
                message="クロール処理に失敗",
878
                on_error=notify,
879
            ):
880
                crawl_page(driver)
881

882
        例外を抑制して続行::
883

884
            with my_lib.selenium_util.error_handler(driver, reraise=False) as handler:
885
                risky_operation()
886

887
            if handler.exception:
888
                logging.warning("処理をスキップしました")
889

890
    """
891

892
    def __init__(
1✔
893
        self,
894
        driver: WebDriver,
895
        message: str = "Selenium operation failed",
896
        on_error: Callable[[Exception, PIL.Image.Image | None, str | None], None] | None = None,
897
        capture_screenshot: bool = True,
898
        reraise: bool = True,
899
    ) -> None:
900
        """初期化"""
901
        self.driver = driver
1✔
902
        self.message = message
1✔
903
        self.on_error = on_error
1✔
904
        self.capture_screenshot = capture_screenshot
1✔
905
        self.reraise = reraise
1✔
906
        self.exception: Exception | None = None
1✔
907
        self.screenshot: PIL.Image.Image | None = None
1✔
908
        self.page_source: str | None = None
1✔
909

910
    def __enter__(self) -> Self:
1✔
911
        """コンテキストマネージャの開始"""
912
        return self
1✔
913

914
    def __exit__(
1✔
915
        self,
916
        exception_type: type[BaseException] | None,
917
        exception_value: BaseException | None,
918
        traceback: types.TracebackType | None,
919
    ) -> bool:
920
        """コンテキストマネージャの終了、エラー処理を実行"""
921
        if exception_value is None:
1✔
922
            return False
1✔
923

924
        # 例外を記録
925
        if isinstance(exception_value, Exception):
1✔
926
            self.exception = exception_value
1✔
927
        else:
928
            # BaseException(KeyboardInterrupt など)は処理せず再送出
929
            return False
×
930

931
        # ログ出力
932
        logging.exception(self.message)
1✔
933

934
        # スクリーンショット・ページソース取得
935
        if self.capture_screenshot:
1✔
936
            try:
1✔
937
                self.page_source = self.driver.page_source
1✔
938
            except Exception:
×
939
                logging.debug("Failed to capture page source for error handling")
×
940
            try:
1✔
941
                screenshot_bytes = self.driver.get_screenshot_as_png()
1✔
942
                self.screenshot = PIL.Image.open(io.BytesIO(screenshot_bytes))
1✔
943
            except Exception:
1✔
944
                logging.debug("Failed to capture screenshot for error handling")
1✔
945

946
        # コールバック呼び出し
947
        if self.on_error is not None:
1✔
948
            try:
1✔
949
                self.on_error(self.exception, self.screenshot, self.page_source)
1✔
950
            except Exception:
×
951
                logging.exception("Error in on_error callback")
×
952

953
        # reraise=False なら例外を抑制
954
        return not self.reraise
1✔
955

956

957
def _is_chrome_related_process(process: psutil.Process) -> bool:
1✔
958
    """プロセスがChrome関連かどうかを判定"""
959
    try:
1✔
960
        process_name = process.name().lower()
1✔
961
        # Chrome関連のプロセス名パターン
962
        chrome_patterns = ["chrome", "chromium", "google-chrome", "undetected_chro"]
1✔
963
        # chromedriverは除外
964
        if "chromedriver" in process_name:
1✔
965
            return False
1✔
966
        return any(pattern in process_name for pattern in chrome_patterns)
1✔
967
    except (psutil.NoSuchProcess, psutil.AccessDenied):
1✔
968
        return False
1✔
969

970

971
def _get_chrome_processes_by_pgid(chromedriver_pid: int, existing_pids: set[int]) -> list[int]:
1✔
972
    """プロセスグループIDで追加のChrome関連プロセスを取得"""
973
    additional_pids = []
×
974
    try:
×
975
        pgid = os.getpgid(chromedriver_pid)
×
976
        for proc in psutil.process_iter(["pid", "name", "ppid"]):
×
977
            if proc.info["pid"] in existing_pids:
×
978
                continue
×
979
            try:
×
980
                if os.getpgid(proc.info["pid"]) == pgid:
×
981
                    proc_obj = psutil.Process(proc.info["pid"])
×
982
                    if _is_chrome_related_process(proc_obj):
×
983
                        additional_pids.append(proc.info["pid"])
×
984
                        logging.debug(
×
985
                            "Found Chrome-related process by pgid: PID %d, name: %s",
986
                            proc.info["pid"],
987
                            proc.info["name"],
988
                        )
989
            except (psutil.NoSuchProcess, psutil.AccessDenied, OSError):
×
990
                pass
×
991
    except (OSError, psutil.NoSuchProcess):
×
992
        logging.debug("Failed to get process group ID for chromedriver")
×
993
    return additional_pids
×
994

995

996
def _get_chrome_related_processes(driver: WebDriver) -> list[int]:
1✔
997
    """Chrome関連の全子プロセスを取得
998

999
    undetected_chromedriver 使用時、Chrome プロセスは chromedriver の子ではなく
1000
    Python プロセスの直接の子として起動されることがあるため、両方を検索する。
1001
    """
1002
    chrome_pids = set()
×
1003

1004
    # 1. driver.service.process の子プロセスを検索
1005
    # NOTE: Chrome/Firefox WebDriver には service 属性があるが、型スタブでは定義されていない
1006
    try:
×
1007
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "process"):
×
1008
            process = driver.service.process
×
1009
            if process and hasattr(process, "pid"):
×
1010
                chromedriver_pid = process.pid
×
1011

1012
                # psutilでプロセス階層を取得
1013
                parent_process = psutil.Process(chromedriver_pid)
×
1014
                children = parent_process.children(recursive=True)
×
1015

1016
                for child in children:
×
1017
                    chrome_pids.add(child.pid)
×
1018
                    logging.debug(
×
1019
                        "Found Chrome-related process (service child): PID %d, name: %s",
1020
                        child.pid,
1021
                        child.name(),
1022
                    )
1023
    except Exception:
×
1024
        logging.exception("Failed to get Chrome-related processes from service")
×
1025

1026
    # 2. 現在の Python プロセスの全子孫から Chrome 関連プロセスを検索
1027
    try:
×
1028
        current_process = psutil.Process()
×
1029
        all_children = current_process.children(recursive=True)
×
1030

1031
        for child in all_children:
×
1032
            if child.pid in chrome_pids:
×
1033
                continue
×
1034
            try:
×
1035
                if _is_chrome_related_process(child):
×
1036
                    chrome_pids.add(child.pid)
×
1037
                    logging.debug(
×
1038
                        "Found Chrome-related process (python child): PID %d, name: %s",
1039
                        child.pid,
1040
                        child.name(),
1041
                    )
1042
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1043
                pass
×
1044
    except Exception:
×
1045
        logging.exception("Failed to get Chrome-related processes from python children")
×
1046

1047
    return list(chrome_pids)
×
1048

1049

1050
def _send_signal_to_processes(pids: list[int], sig: signal.Signals, signal_name: str) -> None:
1✔
1051
    """プロセスリストに指定されたシグナルを送信"""
1052
    errors = []
×
1053
    for pid in pids:
×
1054
        try:
×
1055
            # プロセス名を取得
1056
            try:
×
1057
                process = psutil.Process(pid)
×
1058
                process_name = process.name()
×
1059
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1060
                process_name = "unknown"
×
1061

1062
            if sig == signal.SIGKILL:
×
1063
                # プロセスがまだ存在するかチェック
1064
                os.kill(pid, 0)  # シグナル0は存在確認
×
1065
            os.kill(pid, sig)
×
1066
            logging.info("Sent %s to process: PID %d (%s)", signal_name, pid, process_name)
×
1067
        except (ProcessLookupError, OSError) as e:
×
1068
            # プロセスが既に終了している場合は無視
1069
            errors.append((pid, e))
×
1070

1071
    # エラーが発生した場合はまとめてログ出力
1072
    if errors:
×
1073
        logging.debug("Failed to send %s to some processes: %s", signal_name, errors)
×
1074

1075

1076
def _terminate_chrome_processes(chrome_pids: list[int], timeout: float = 5.0) -> None:
1✔
1077
    """Chrome関連プロセスを段階的に終了
1078

1079
    Args:
1080
        chrome_pids: 終了対象のプロセスIDリスト
1081
        timeout: SIGTERM後にプロセス終了を待機する最大時間(秒)
1082

1083
    """
1084
    if not chrome_pids:
×
1085
        return
×
1086

1087
    # 優雅な終了(SIGTERM)
1088
    _send_signal_to_processes(chrome_pids, signal.SIGTERM, "SIGTERM")
×
1089

1090
    # プロセスの終了を待機(ポーリング)
1091
    remaining_pids = list(chrome_pids)
×
1092
    poll_interval = 0.2
×
1093
    elapsed = 0.0
×
1094

1095
    while remaining_pids and elapsed < timeout:
×
1096
        time.sleep(poll_interval)
×
1097
        elapsed += poll_interval
×
1098

1099
        # まだ生存しているプロセスをチェック
1100
        still_alive = []
×
1101
        for pid in remaining_pids:
×
1102
            try:
×
1103
                if psutil.pid_exists(pid):
×
1104
                    process = psutil.Process(pid)
×
1105
                    if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
×
1106
                        still_alive.append(pid)
×
1107
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1108
                pass
×
1109

1110
        remaining_pids = still_alive
×
1111

1112
    # タイムアウト後もまだ残っているプロセスにのみ SIGKILL を送信
1113
    if remaining_pids:
×
1114
        logging.warning(
×
1115
            "Chrome processes still alive after %.1fs, sending SIGKILL to %d processes",
1116
            elapsed,
1117
            len(remaining_pids),
1118
        )
1119
        _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
1120

1121

1122
def _reap_single_process(pid: int) -> None:
1✔
1123
    """単一プロセスをwaitpidで回収"""
1124
    try:
×
1125
        # ノンブロッキングでwaitpid
1126
        result_pid, status = os.waitpid(pid, os.WNOHANG)
×
1127
        if result_pid == pid:
×
1128
            # プロセス名を取得
1129
            try:
×
1130
                process = psutil.Process(pid)
×
1131
                process_name = process.name()
×
1132
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1133
                process_name = "unknown"
×
1134
            logging.debug("Reaped Chrome process: PID %d (%s)", pid, process_name)
×
1135
    except (ChildProcessError, OSError):
×
1136
        # 子プロセスでない場合や既に回収済みの場合は無視
1137
        pass
×
1138

1139

1140
def _reap_chrome_processes(chrome_pids: list[int]) -> None:
1✔
1141
    """Chrome関連プロセスを明示的に回収してゾンビ化を防ぐ"""
1142
    for pid in chrome_pids:
×
1143
        _reap_single_process(pid)
×
1144

1145

1146
def _get_remaining_chrome_pids(chrome_pids: list[int]) -> list[int]:
1✔
1147
    """指定されたPIDリストから、まだ生存しているChrome関連プロセスを取得"""
1148
    remaining = []
×
1149
    for pid in chrome_pids:
×
1150
        try:
×
1151
            if psutil.pid_exists(pid):
×
1152
                process = psutil.Process(pid)
×
1153
                if (
×
1154
                    process.is_running()
1155
                    and process.status() != psutil.STATUS_ZOMBIE
1156
                    and _is_chrome_related_process(process)
1157
                ):
1158
                    remaining.append(pid)
×
1159
        except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1160
            pass
×
1161
    return remaining
×
1162

1163

1164
def _wait_for_processes_with_check(
1✔
1165
    chrome_pids: list[int],
1166
    timeout: float,
1167
    poll_interval: float = 0.2,
1168
    log_interval: float = 1.0,
1169
) -> list[int]:
1170
    """プロセスの終了を待機しつつ、残存プロセスをチェック
1171

1172
    Args:
1173
        chrome_pids: 監視対象のプロセスIDリスト
1174
        timeout: 最大待機時間(秒)
1175
        poll_interval: チェック間隔(秒)
1176
        log_interval: ログ出力間隔(秒)
1177

1178
    Returns:
1179
        タイムアウト後も残存しているプロセスIDのリスト
1180

1181
    """
1182
    elapsed = 0.0
×
1183
    last_log_time = 0.0
×
1184
    remaining_pids = list(chrome_pids)
×
1185

1186
    while remaining_pids and elapsed < timeout:
×
1187
        time.sleep(poll_interval)
×
1188
        elapsed += poll_interval
×
1189
        remaining_pids = _get_remaining_chrome_pids(remaining_pids)
×
1190

1191
        if remaining_pids and (elapsed - last_log_time) >= log_interval:
×
1192
            logging.info(
×
1193
                "Found %d remaining Chrome processes after %.0fs",
1194
                len(remaining_pids),
1195
                elapsed,
1196
            )
1197
            last_log_time = elapsed
×
1198

1199
    return remaining_pids
×
1200

1201

1202
def quit_driver_gracefully(
1✔
1203
    driver: WebDriver | None,
1204
    wait_sec: float = 5.0,
1205
    sigterm_wait_sec: float = 5.0,
1206
    sigkill_wait_sec: float = 5.0,
1207
) -> None:
1208
    """Chrome WebDriverを確実に終了する
1209

1210
    終了フロー:
1211
    1. driver.quit() を呼び出し
1212
    2. wait_sec 秒待機しつつプロセス終了をチェック
1213
    3. 残存プロセスがあれば SIGTERM を送信
1214
    4. sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1215
    5. 残存プロセスがあれば SIGKILL を送信
1216
    6. sigkill_wait_sec 秒待機
1217

1218
    Args:
1219
        driver: 終了する WebDriver インスタンス
1220
        wait_sec: quit 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1221
        sigterm_wait_sec: SIGTERM 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1222
        sigkill_wait_sec: SIGKILL 後にプロセス回収を待機する秒数(デフォルト: 5秒)
1223

1224
    """
1225
    if driver is None:
1✔
1226
        return
1✔
1227

1228
    # quit前にChrome関連プロセスを記録
1229
    chrome_pids_before = _get_chrome_related_processes(driver)
1✔
1230

1231
    try:
1✔
1232
        # WebDriverの正常終了を試行(これがタブのクローズも含む)
1233
        driver.quit()
1✔
1234
        logging.info("WebDriver quit successfully")
1✔
1235
    except Exception:
1✔
1236
        logging.warning("Failed to quit driver normally", exc_info=True)
1✔
1237
    finally:
1238
        # undetected_chromedriver の __del__ がシャットダウン時に再度呼ばれるのを防ぐ
1239
        if hasattr(driver, "_has_quit"):
1✔
1240
            driver._has_quit = True  # type: ignore[attr-defined]
1✔
1241

1242
    # ChromeDriverサービスの停止を試行
1243
    try:
1✔
1244
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "stop"):
1✔
1245
            driver.service.stop()  # type: ignore[call-non-callable]
×
1246
    except (ConnectionResetError, OSError):
×
1247
        # Chrome が既に終了している場合は無視
1248
        logging.debug("Chrome service already stopped")
×
1249
    except Exception:
×
1250
        logging.warning("Failed to stop Chrome service", exc_info=True)
×
1251

1252
    # Step 1: quit 後に wait_sec 秒待機しつつプロセス終了をチェック
1253
    remaining_pids = _wait_for_processes_with_check(chrome_pids_before, wait_sec)
1✔
1254

1255
    if not remaining_pids:
1✔
1256
        logging.debug("All Chrome processes exited normally")
1✔
1257
        return
1✔
1258

1259
    # Step 2: 残存プロセスに SIGTERM を送信
1260
    logging.info(
×
1261
        "Found %d remaining Chrome processes after %.0fs, sending SIGTERM",
1262
        len(remaining_pids),
1263
        wait_sec,
1264
    )
1265
    _send_signal_to_processes(remaining_pids, signal.SIGTERM, "SIGTERM")
×
1266

1267
    # Step 3: SIGTERM 後に sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1268
    remaining_pids = _wait_for_processes_with_check(remaining_pids, sigterm_wait_sec)
×
1269

1270
    if not remaining_pids:
×
1271
        logging.info("All Chrome processes exited after SIGTERM")
×
1272
        _reap_chrome_processes(chrome_pids_before)
×
1273
        return
×
1274

1275
    # Step 4: 残存プロセスに SIGKILL を送信
1276
    logging.warning(
×
1277
        "Chrome processes still alive after SIGTERM + %.1fs, sending SIGKILL to %d processes",
1278
        sigterm_wait_sec,
1279
        len(remaining_pids),
1280
    )
1281
    _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
1282

1283
    # Step 5: SIGKILL 後に sigkill_wait_sec 秒待機してプロセス回収
1284
    time.sleep(sigkill_wait_sec)
×
1285
    _reap_chrome_processes(chrome_pids_before)
×
1286

1287
    # 最終チェック:まだ残っているプロセスがあるか確認
1288
    still_remaining = _get_remaining_chrome_pids(remaining_pids)
×
1289

1290
    # 回収できなかったプロセスについて警告
1291
    if still_remaining:
×
1292
        for pid in still_remaining:
×
1293
            try:
×
1294
                process = psutil.Process(pid)
×
1295
                logging.warning("Failed to collect Chrome-related process: PID %d (%s)", pid, process.name())
×
1296
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1297
                pass
×
1298

1299

1300
if __name__ == "__main__":
1301
    import pathlib
1302

1303
    import docopt
1304
    import selenium.webdriver.support.wait
1305

1306
    import my_lib.config
1307
    import my_lib.logger
1308

1309
    assert __doc__ is not None  # noqa: S101
1310
    args = docopt.docopt(__doc__)
1311

1312
    config_file = args["-c"]
1313
    debug_mode = args["-D"]
1314

1315
    my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
1316

1317
    config = my_lib.config.load(config_file)
1318

1319
    driver = create_driver("test", pathlib.Path(config["data"]["selenium"]))
1320
    wait = selenium.webdriver.support.wait.WebDriverWait(driver, 5)
1321

1322
    driver.get("https://www.google.com/")
1323
    wait.until(
1324
        selenium.webdriver.support.expected_conditions.presence_of_element_located(
1325
            (selenium.webdriver.common.by.By.XPATH, '//input[contains(@value, "Google")]')
1326
        )
1327
    )
1328

1329
    quit_driver_gracefully(driver)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc