• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kimata / my-py-lib / 20720271765

05 Jan 2026 03:30PM UTC coverage: 64.339% (-0.3%) from 64.594%
20720271765

push

github

kimata
fix: error_handler テストの on_error コールバック引数を修正

page_source 追加に伴い、on_error コールバックが3引数
(exc, screenshot, page_source) になったことに対応

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

3123 of 4854 relevant lines covered (64.34%)

0.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.57
/src/my_lib/selenium_util.py
1
#!/usr/bin/env python3
2
"""
3
Selenium を Chrome Driver を使って動かします。
4

5
Usage:
6
  selenium_util.py [-c CONFIG] [-D]
7

8
Options:
9
  -c CONFIG         : CONFIG を設定ファイルとして読み込んで実行します。
10
                      [default: tests/fixtures/config.example.yaml]
11
  -D                : デバッグモードで動作します。
12
"""
13

14
from __future__ import annotations
1✔
15

16
import datetime
1✔
17
import inspect
1✔
18
import io
1✔
19
import logging
1✔
20
import os
1✔
21
import pathlib
1✔
22
import random
1✔
23
import re
1✔
24
import signal
1✔
25
import subprocess
1✔
26
import time
1✔
27
from typing import TYPE_CHECKING, Any, Self, TypeVar
1✔
28

29
import PIL.Image
1✔
30
import psutil
1✔
31
import selenium
1✔
32
import selenium.common.exceptions
1✔
33
import selenium.webdriver.chrome.options
1✔
34
import selenium.webdriver.chrome.service
1✔
35
import selenium.webdriver.common.action_chains
1✔
36
import selenium.webdriver.common.by
1✔
37
import selenium.webdriver.common.keys
1✔
38
import selenium.webdriver.support.expected_conditions
1✔
39

40
import my_lib.chrome_util
1✔
41
import my_lib.time
1✔
42

43
if TYPE_CHECKING:
44
    import types
45
    from collections.abc import Callable
46

47
    from selenium.webdriver.remote.webdriver import WebDriver
48
    from selenium.webdriver.support.wait import WebDriverWait
49

50
T = TypeVar("T")
1✔
51

52
WAIT_RETRY_COUNT: int = 1
1✔
53

54

55
class SeleniumError(Exception):
1✔
56
    """Selenium 関連エラーの基底クラス"""
57

58

59
def _get_chrome_version() -> int | None:
1✔
60
    try:
1✔
61
        result = subprocess.run(
1✔
62
            ["google-chrome", "--version"],  # noqa: S607
63
            capture_output=True,
64
            text=True,
65
            timeout=10,
66
            check=False,
67
        )
68
        match = re.search(r"(\d+)\.", result.stdout)
1✔
69
        if match:
1✔
70
            return int(match.group(1))
1✔
71
    except Exception:
1✔
72
        logging.warning("Failed to detect Chrome version")
1✔
73
    return None
1✔
74

75

76
def _create_chrome_options(
1✔
77
    profile_name: str,
78
    chrome_data_path: pathlib.Path,
79
    log_path: pathlib.Path,
80
    is_headless: bool,
81
) -> selenium.webdriver.chrome.options.Options:
82
    """Chrome オプションを作成する
83

84
    Args:
85
        profile_name: プロファイル名
86
        chrome_data_path: Chrome データディレクトリのパス
87
        log_path: ログディレクトリのパス
88
        is_headless: ヘッドレスモードで起動するか
89

90
    Returns:
91
        設定済みの Chrome オプション
92

93
    """
94
    options = selenium.webdriver.chrome.options.Options()
×
95

96
    if is_headless:
×
97
        options.add_argument("--headless=new")
×
98

99
    options.add_argument("--no-sandbox")  # for Docker
×
100
    options.add_argument("--disable-dev-shm-usage")  # for Docker
×
101
    options.add_argument("--disable-gpu")
×
102

103
    options.add_argument("--disable-popup-blocking")
×
104
    options.add_argument("--disable-plugins")
×
105

106
    options.add_argument("--no-first-run")
×
107

108
    options.add_argument("--lang=ja-JP")
×
109
    options.add_argument("--window-size=1920,1080")
×
110

111
    # NOTE: Accept-Language ヘッダーを日本語優先に設定
112
    options.add_experimental_option("prefs", {"intl.accept_languages": "ja-JP,ja,en-US,en"})
×
113

114
    options.add_argument("--user-data-dir=" + str(chrome_data_path / profile_name))
×
115

116
    options.add_argument("--enable-logging")
×
117
    options.add_argument("--v=1")
×
118

119
    chrome_log_file = log_path / f"chrome_{profile_name}.log"
×
120
    options.add_argument(f"--log-file={chrome_log_file!s}")
×
121

122
    if not is_headless:
×
123
        options.add_argument("--auto-open-devtools-for-tabs")
×
124

125
    return options
×
126

127

128
def _create_driver_impl(
1✔
129
    profile_name: str,
130
    data_path: pathlib.Path,
131
    is_headless: bool,
132
    use_subprocess: bool,
133
    use_undetected: bool,
134
) -> WebDriver:
135
    """WebDriver を作成する内部実装
136

137
    Args:
138
        profile_name: プロファイル名
139
        data_path: データディレクトリのパス
140
        is_headless: ヘッドレスモードで起動するか
141
        use_subprocess: サブプロセスで Chrome を起動するか(undetected_chromedriver のみ)
142
        use_undetected: undetected_chromedriver を使用するか
143

144
    Returns:
145
        WebDriver インスタンス
146

147
    """
148
    chrome_data_path = data_path / "chrome"
×
149
    log_path = data_path / "log"
×
150

151
    # NOTE: Pytest を並列実行できるようにする
152
    suffix = os.environ.get("PYTEST_XDIST_WORKER", None)
×
153
    if suffix is not None:
×
154
        profile_name += "." + suffix
×
155

156
    chrome_data_path.mkdir(parents=True, exist_ok=True)
×
157
    log_path.mkdir(parents=True, exist_ok=True)
×
158

159
    options = _create_chrome_options(profile_name, chrome_data_path, log_path, is_headless)
×
160

161
    service = selenium.webdriver.chrome.service.Service(
×
162
        service_args=["--verbose", f"--log-path={str(log_path / 'webdriver.log')!s}"],
163
    )
164

165
    if use_undetected:
×
166
        import undetected_chromedriver
×
167

168
        chrome_version = _get_chrome_version()
×
169

170
        # NOTE: user_multi_procs=True は既存の chromedriver ファイルが存在することを前提としているため、
171
        # ファイルが存在しない場合(CI環境の初回実行など)は False にする
172
        uc_data_path = pathlib.Path("~/.local/share/undetected_chromedriver").expanduser()
×
173
        use_multi_procs = uc_data_path.exists() and any(uc_data_path.glob("*chromedriver*"))
×
174

175
        driver = undetected_chromedriver.Chrome(
×
176
            service=service,
177
            options=options,
178
            use_subprocess=use_subprocess,
179
            version_main=chrome_version,
180
            user_multi_procs=use_multi_procs,
181
        )
182
    else:
183
        driver = selenium.webdriver.Chrome(
×
184
            service=service,
185
            options=options,
186
        )
187

188
    driver.set_page_load_timeout(30)
×
189

190
    # CDP を使って日本語ロケールを強制設定
191
    set_japanese_locale(driver)
×
192

193
    return driver
×
194

195

196
def create_driver(
1✔
197
    profile_name: str,
198
    data_path: pathlib.Path,
199
    is_headless: bool = True,
200
    clean_profile: bool = False,
201
    auto_recover: bool = True,
202
    use_undetected: bool = True,
203
    use_subprocess: bool = False,
204
) -> WebDriver:
205
    """Chrome WebDriver を作成する
206

207
    Args:
208
        profile_name: プロファイル名
209
        data_path: データディレクトリのパス
210
        is_headless: ヘッドレスモードで起動するか
211
        clean_profile: 起動前にロックファイルを削除するか
212
        auto_recover: プロファイル破損時に自動リカバリするか
213
        use_undetected: undetected_chromedriver を使用するか(デフォルト: True)
214
        use_subprocess: サブプロセスで Chrome を起動するか(undetected_chromedriver のみ)
215

216
    Raises:
217
        ValueError: use_undetected=False かつ use_subprocess=True の場合
218

219
    """
220
    if not use_undetected and use_subprocess:
×
221
        raise ValueError("use_subprocess=True は use_undetected=True の場合のみ有効です")
×
222

223
    # NOTE: ルートロガーの出力レベルを変更した場合でも Selenium 関係は抑制する
224
    logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
×
225
    logging.getLogger("selenium.webdriver.common.selenium_manager").setLevel(logging.WARNING)
×
226
    logging.getLogger("selenium.webdriver.remote.remote_connection").setLevel(logging.WARNING)
×
227

228
    # NOTE: chrome_util の内部関数を使用(同一パッケージ内での使用は許容)
229
    actual_profile_name = my_lib.chrome_util._get_actual_profile_name(profile_name)
×
230
    profile_path = data_path / "chrome" / actual_profile_name
×
231

232
    # プロファイル健全性チェック
233
    health = my_lib.chrome_util._check_profile_health(profile_path)
×
234
    if not health.is_healthy:
×
235
        logging.warning("Profile health check failed: %s", ", ".join(health.errors))
×
236

237
        if health.has_lock_files and not (health.has_corrupted_json or health.has_corrupted_db):
×
238
            # ロックファイルのみの問題なら削除して続行
239
            logging.info("Cleaning up lock files only")
×
240
            my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
241
        elif auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
242
            # JSON または DB が破損している場合はプロファイルをリカバリ
243
            logging.warning("Profile is corrupted, attempting recovery")
×
244
            if my_lib.chrome_util._recover_corrupted_profile(profile_path):
×
245
                logging.info("Profile recovery successful, will create new profile")
×
246
            else:
247
                logging.error("Profile recovery failed")
×
248

249
    if clean_profile:
×
250
        my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
251

252
    # NOTE: 1回だけ自動リトライ
253
    try:
×
254
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess, use_undetected)
×
255
    except Exception as e:
×
256
        logging.warning("First attempt to create driver failed: %s", e)
×
257

258
        # コンテナ内で実行中の場合のみ、残った Chrome プロセスをクリーンアップ
259
        my_lib.chrome_util._cleanup_orphaned_chrome_processes_in_container()
×
260

261
        # プロファイルのロックファイルを削除
262
        my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
263

264
        # 再度健全性チェック
265
        health = my_lib.chrome_util._check_profile_health(profile_path)
×
266
        if not health.is_healthy and auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
267
            logging.warning("Profile still corrupted after first attempt, recovering")
×
268
            my_lib.chrome_util._recover_corrupted_profile(profile_path)
×
269

270
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess, use_undetected)
×
271

272

273
def xpath_exists(driver: WebDriver, xpath: str) -> bool:
1✔
274
    return len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0
×
275

276

277
def get_text(
1✔
278
    driver: WebDriver,
279
    xpath: str,
280
    safe_text: str,
281
    wait: WebDriverWait[WebDriver] | None = None,
282
) -> str:
283
    if wait is not None:
×
284
        wait.until(
×
285
            selenium.webdriver.support.expected_conditions.presence_of_all_elements_located(
286
                (selenium.webdriver.common.by.By.XPATH, xpath)
287
            )
288
        )
289

290
    if len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0:
×
291
        return driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).text.strip()
×
292
    else:
293
        return safe_text
×
294

295

296
def input_xpath(
1✔
297
    driver: WebDriver,
298
    xpath: str,
299
    text: str,
300
    wait: WebDriverWait[WebDriver] | None = None,
301
    is_warn: bool = True,
302
) -> bool:
303
    if wait is not None:
×
304
        wait.until(
×
305
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
306
                (selenium.webdriver.common.by.By.XPATH, xpath)
307
            )
308
        )
309
        time.sleep(0.05)
×
310

311
    if xpath_exists(driver, xpath):
×
312
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).send_keys(text)
×
313
        return True
×
314
    else:
315
        if is_warn:
×
316
            logging.warning("Element is not found: %s", xpath)
×
317
        return False
×
318

319

320
def click_xpath(
1✔
321
    driver: WebDriver,
322
    xpath: str,
323
    wait: WebDriverWait[WebDriver] | None = None,
324
    is_warn: bool = True,
325
    move: bool = False,
326
) -> bool:
327
    if wait is not None:
×
328
        wait.until(
×
329
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
330
                (selenium.webdriver.common.by.By.XPATH, xpath)
331
            )
332
        )
333
        time.sleep(0.05)
×
334

335
    if xpath_exists(driver, xpath):
×
336
        elem = driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath)
×
337
        if move:
×
338
            action = selenium.webdriver.common.action_chains.ActionChains(driver)
×
339
            action.move_to_element(elem)
×
340
            action.perform()
×
341

342
        elem.click()
×
343
        return True
×
344
    else:
345
        if is_warn:
×
346
            logging.warning("Element is not found: %s", xpath)
×
347
        return False
×
348

349

350
def is_display(driver: WebDriver, xpath: str) -> bool:
1✔
351
    return (len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0) and (
×
352
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).is_displayed()
353
    )
354

355

356
def random_sleep(sec: float) -> None:
1✔
357
    RATIO = 0.8
1✔
358

359
    time.sleep((sec * RATIO) + (sec * (1 - RATIO) * 2) * random.random())  # noqa: S311
1✔
360

361

362
def with_retry(
1✔
363
    func: Callable[[], T],
364
    max_retries: int = 3,
365
    delay: float = 1.0,
366
    exceptions: tuple[type[Exception], ...] = (Exception,),
367
    on_retry: Callable[[int, Exception], bool | None] | None = None,
368
) -> T:
369
    """リトライ付きで関数を実行
370

371
    全て失敗した場合は最後の例外を再スロー。
372
    呼び出し側で try/except してエラー処理を行う。
373

374
    Args:
375
        func: 実行する関数
376
        max_retries: 最大リトライ回数
377
        delay: リトライ間の待機秒数
378
        exceptions: リトライ対象の例外タプル
379
        on_retry: リトライ時のコールバック (attempt, exception)
380
            - None または True を返すとリトライを継続
381
            - False を返すとリトライを中止して例外を再スロー
382

383
    Returns:
384
        成功時は関数の戻り値
385

386
    Raises:
387
        最後の例外を再スロー
388

389
    """
390
    last_exception: Exception | None = None
×
391

392
    for attempt in range(max_retries):
×
393
        try:
×
394
            return func()
×
395
        except exceptions as e:
×
396
            last_exception = e
×
397
            if attempt < max_retries - 1:
×
398
                if on_retry:
×
399
                    should_continue = on_retry(attempt + 1, e)
×
400
                    if should_continue is False:
×
401
                        raise
×
402
                time.sleep(delay)
×
403

404
    if last_exception:
×
405
        raise last_exception
×
406
    raise RuntimeError("Unexpected state in with_retry")
×
407

408

409
def wait_patiently(
1✔
410
    driver: WebDriver,
411
    wait: WebDriverWait[WebDriver],
412
    target: Any,
413
) -> None:
414
    error: selenium.common.exceptions.TimeoutException | None = None
×
415
    for i in range(WAIT_RETRY_COUNT + 1):
×
416
        try:
×
417
            wait.until(target)
×
418
            return
×
419
        except selenium.common.exceptions.TimeoutException as e:
×
420
            logging.warning(
×
421
                "タイムアウトが発生しました。(%s in %s line %d)",
422
                inspect.stack()[1].function,
423
                inspect.stack()[1].filename,
424
                inspect.stack()[1].lineno,
425
            )
426
            error = e
×
427

428
            logging.info(i)
×
429
            if i != WAIT_RETRY_COUNT:
×
430
                logging.info("refresh")
×
431
                driver.refresh()
×
432

433
    if error is not None:
×
434
        raise error
×
435

436

437
def dump_page(
1✔
438
    driver: WebDriver,
439
    index: int,
440
    dump_path: pathlib.Path,
441
    stack_index: int = 1,
442
) -> None:
443
    name = inspect.stack()[stack_index].function.replace("<", "").replace(">", "")
×
444

445
    dump_path.mkdir(parents=True, exist_ok=True)
×
446

447
    png_path = dump_path / f"{name}_{index:02d}.png"
×
448
    htm_path = dump_path / f"{name}_{index:02d}.htm"
×
449

450
    driver.save_screenshot(str(png_path))
×
451

452
    with htm_path.open("w", encoding="utf-8") as f:
×
453
        f.write(driver.page_source)
×
454

455
    logging.info(
×
456
        "page dump: %02d from %s in %s line %d",
457
        index,
458
        inspect.stack()[stack_index].function,
459
        inspect.stack()[stack_index].filename,
460
        inspect.stack()[stack_index].lineno,
461
    )
462

463

464
def clear_cache(driver: WebDriver) -> None:
1✔
465
    driver.execute_cdp_cmd("Network.clearBrowserCache", {})
×
466

467

468
def set_japanese_locale(driver: WebDriver) -> None:
1✔
469
    """CDP を使って日本語ロケールを強制設定
470

471
    Chrome の起動オプションだけでは言語設定が変わってしまうことがあるため、
472
    CDP を使って Accept-Language ヘッダーとロケールを強制的に日本語に設定する。
473
    """
474
    try:
×
475
        # NOTE: Network.setExtraHTTPHeaders は Network.enable を先に呼ばないと機能しない
476
        driver.execute_cdp_cmd("Network.enable", {})
×
477
        driver.execute_cdp_cmd(
×
478
            "Network.setExtraHTTPHeaders",
479
            {"headers": {"Accept-Language": "ja-JP,ja;q=0.9"}},
480
        )
481
        driver.execute_cdp_cmd(
×
482
            "Emulation.setLocaleOverride",
483
            {"locale": "ja-JP"},
484
        )
485
        logging.debug("Japanese locale set via CDP")
×
486
    except Exception:
×
487
        logging.warning("Failed to set Japanese locale via CDP")
×
488

489

490
def clean_dump(dump_path: pathlib.Path, keep_days: int = 1) -> None:
1✔
491
    if not dump_path.exists():
1✔
492
        return
1✔
493

494
    time_threshold = datetime.timedelta(keep_days)
1✔
495

496
    for item in dump_path.iterdir():
1✔
497
        if not item.is_file():
1✔
498
            continue
1✔
499
        try:
1✔
500
            time_diff = datetime.datetime.now(datetime.UTC) - datetime.datetime.fromtimestamp(
1✔
501
                item.stat().st_mtime, datetime.UTC
502
            )
503
        except FileNotFoundError:
×
504
            # ファイルが別プロセスにより削除された場合(SQLiteの一時ファイルなど)
505
            continue
×
506
        if time_diff > time_threshold:
1✔
507
            logging.warning("remove %s [%s day(s) old].", item.absolute(), f"{time_diff.days:,}")
1✔
508

509
            item.unlink(missing_ok=True)
1✔
510

511

512
def get_memory_info(driver: WebDriver) -> dict[str, Any]:
1✔
513
    """ブラウザのメモリ使用量を取得(単位: KB)"""
514
    total_bytes = subprocess.Popen(  # noqa: S602
×
515
        "smem -t -c pss -P chrome | tail -n 1",  # noqa: S607
516
        shell=True,
517
        stdout=subprocess.PIPE,
518
    ).communicate()[0]
519
    total = int(str(total_bytes, "utf-8").strip())  # smem の出力は KB 単位
×
520

521
    try:
×
522
        memory_info = driver.execute_cdp_cmd("Memory.getAllTimeSamplingProfile", {})
×
523
        heap_usage = driver.execute_cdp_cmd("Runtime.getHeapUsage", {})
×
524

525
        heap_used = heap_usage.get("usedSize", 0) // 1024  # bytes → KB
×
526
        heap_total = heap_usage.get("totalSize", 0) // 1024  # bytes → KB
×
527
    except Exception as e:
×
528
        logging.debug("Failed to get memory usage: %s", e)
×
529

530
        memory_info = None
×
531
        heap_used = 0
×
532
        heap_total = 0
×
533

534
    return {
×
535
        "total": total,
536
        "heap_used": heap_used,
537
        "heap_total": heap_total,
538
        "memory_info": memory_info,
539
    }
540

541

542
def log_memory_usage(driver: WebDriver) -> None:
1✔
543
    mem_info = get_memory_info(driver)
×
544
    logging.info(
×
545
        "Chrome memory: %s MB (JS heap: %s MB)",
546
        f"""{mem_info["total"] // 1024:,}""",
547
        f"""{mem_info["heap_used"] // 1024:,}""",
548
    )
549

550

551
def _warmup(
1✔
552
    driver: WebDriver,
553
    keyword: str,
554
    url_pattern: str,
555
    sleep_sec: int = 3,
556
) -> None:
557
    # NOTE: ダミーアクセスを行って BOT ではないと思わせる。(効果なさそう...)
558
    driver.get("https://www.yahoo.co.jp/")
×
559
    time.sleep(sleep_sec)
×
560

561
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(keyword)
×
562
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(
×
563
        selenium.webdriver.common.keys.Keys.ENTER
564
    )
565

566
    time.sleep(sleep_sec)
×
567

568
    driver.find_element(
×
569
        selenium.webdriver.common.by.By.XPATH, f'//a[contains(@href, "{url_pattern}")]'
570
    ).click()
571

572
    time.sleep(sleep_sec)
×
573

574

575
class browser_tab:
1✔
576
    """新しいブラウザタブで URL を開くコンテキストマネージャ"""
577

578
    def __init__(self, driver: WebDriver, url: str) -> None:
1✔
579
        """初期化
580

581
        Args:
582
            driver: WebDriver インスタンス
583
            url: 開く URL
584

585
        """
586
        self.driver = driver
1✔
587
        self.url = url
1✔
588
        self.original_window: str | None = None
1✔
589

590
    def __enter__(self) -> None:
1✔
591
        """新しいタブを開いて URL にアクセス"""
592
        self.original_window = self.driver.current_window_handle
1✔
593
        self.driver.execute_script("window.open('');")
1✔
594
        self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
595
        try:
1✔
596
            self.driver.get(self.url)
1✔
597
        except Exception:
×
598
            # NOTE: URL読み込みに失敗した場合もクリーンアップしてから例外を再送出
599
            self._cleanup()
×
600
            raise
×
601

602
    def _cleanup(self) -> None:
1✔
603
        """タブを閉じて元のウィンドウに戻る"""
604
        try:
1✔
605
            # 余分なタブを閉じる
606
            while len(self.driver.window_handles) > 1:
1✔
607
                self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
608
                self.driver.close()
1✔
609
            if self.original_window is not None:
1✔
610
                self.driver.switch_to.window(self.original_window)
1✔
611
            time.sleep(0.1)
1✔
612
        except Exception:
1✔
613
            # NOTE: Chromeがクラッシュした場合は無視(既に終了しているため操作不可)
614
            logging.exception("タブのクリーンアップに失敗しました(Chromeがクラッシュした可能性があります)")
1✔
615

616
    def _recover_from_error(self) -> None:
1✔
617
        """エラー後にブラウザの状態を回復する"""
618
        try:
×
619
            # ページロードタイムアウトをリセット(負の値になっている可能性があるため)
620
            self.driver.set_page_load_timeout(30)
×
621

622
            # about:blank に移動してレンダラーの状態をリセット
623
            self.driver.get("about:blank")
×
624
            time.sleep(0.5)
×
625
        except Exception:
×
626
            logging.warning("ブラウザの回復に失敗しました")
×
627

628
    def __exit__(
1✔
629
        self,
630
        exception_type: type[BaseException] | None,
631
        exception_value: BaseException | None,
632
        traceback: types.TracebackType | None,
633
    ) -> None:
634
        """タブを閉じて元のウィンドウに戻る"""
635
        self._cleanup()
1✔
636

637
        # 例外が発生した場合はブラウザの状態を回復
638
        if exception_type is not None:
1✔
639
            self._recover_from_error()
×
640

641

642
class error_handler:
1✔
643
    """Selenium操作時のエラーハンドリング用コンテキストマネージャ
644

645
    エラー発生時に自動でログ出力、スクリーンショット取得、コールバック呼び出しを行う。
646

647
    Args:
648
        driver: WebDriver インスタンス
649
        message: ログに出力するエラーメッセージ
650
        on_error: エラー時に呼ばれるコールバック関数
651
            (exception, screenshot: PIL.Image.Image | None, page_source: str | None) -> None
652
        capture_screenshot: スクリーンショットを自動取得するか(デフォルト: True)
653
        reraise: 例外を再送出するか(デフォルト: True)
654

655
    Attributes:
656
        exception: 発生した例外(エラーがなければ None)
657
        screenshot: 取得したスクリーンショット(PIL.Image.Image、取得失敗時は None)
658
        page_source: 取得したページソース(取得失敗時は None)
659

660
    Examples:
661
        基本的な使用方法::
662

663
            with my_lib.selenium_util.error_handler(driver, message="ログイン処理に失敗") as handler:
664
                driver.get(login_url)
665
                driver.find_element(...).click()
666

667
        コールバック付き(Slack通知など)::
668

669
            def notify(exc, screenshot, page_source):
670
                slack.notify_error_with_page(config, "エラー発生", exc, screenshot, page_source)
671

672
            with my_lib.selenium_util.error_handler(
673
                driver,
674
                message="クロール処理に失敗",
675
                on_error=notify,
676
            ):
677
                crawl_page(driver)
678

679
        例外を抑制して続行::
680

681
            with my_lib.selenium_util.error_handler(driver, reraise=False) as handler:
682
                risky_operation()
683

684
            if handler.exception:
685
                logging.warning("処理をスキップしました")
686

687
    """
688

689
    def __init__(
1✔
690
        self,
691
        driver: WebDriver,
692
        message: str = "Selenium operation failed",
693
        on_error: Callable[[Exception, PIL.Image.Image | None, str | None], None] | None = None,
694
        capture_screenshot: bool = True,
695
        reraise: bool = True,
696
    ) -> None:
697
        """初期化"""
698
        self.driver = driver
1✔
699
        self.message = message
1✔
700
        self.on_error = on_error
1✔
701
        self.capture_screenshot = capture_screenshot
1✔
702
        self.reraise = reraise
1✔
703
        self.exception: Exception | None = None
1✔
704
        self.screenshot: PIL.Image.Image | None = None
1✔
705
        self.page_source: str | None = None
1✔
706

707
    def __enter__(self) -> Self:
1✔
708
        """コンテキストマネージャの開始"""
709
        return self
1✔
710

711
    def __exit__(
1✔
712
        self,
713
        exception_type: type[BaseException] | None,
714
        exception_value: BaseException | None,
715
        traceback: types.TracebackType | None,
716
    ) -> bool:
717
        """コンテキストマネージャの終了、エラー処理を実行"""
718
        if exception_value is None:
1✔
719
            return False
1✔
720

721
        # 例外を記録
722
        if isinstance(exception_value, Exception):
1✔
723
            self.exception = exception_value
1✔
724
        else:
725
            # BaseException(KeyboardInterrupt など)は処理せず再送出
726
            return False
×
727

728
        # ログ出力
729
        logging.exception(self.message)
1✔
730

731
        # スクリーンショット・ページソース取得
732
        if self.capture_screenshot:
1✔
733
            try:
1✔
734
                self.page_source = self.driver.page_source
1✔
735
            except Exception:
×
736
                logging.debug("Failed to capture page source for error handling")
×
737
            try:
1✔
738
                screenshot_bytes = self.driver.get_screenshot_as_png()
1✔
739
                self.screenshot = PIL.Image.open(io.BytesIO(screenshot_bytes))
1✔
740
            except Exception:
1✔
741
                logging.debug("Failed to capture screenshot for error handling")
1✔
742

743
        # コールバック呼び出し
744
        if self.on_error is not None:
1✔
745
            try:
1✔
746
                self.on_error(self.exception, self.screenshot, self.page_source)
1✔
747
            except Exception:
×
748
                logging.exception("Error in on_error callback")
×
749

750
        # reraise=False なら例外を抑制
751
        return not self.reraise
1✔
752

753

754
def _is_chrome_related_process(process: psutil.Process) -> bool:
1✔
755
    """プロセスがChrome関連かどうかを判定"""
756
    try:
1✔
757
        process_name = process.name().lower()
1✔
758
        # Chrome関連のプロセス名パターン
759
        chrome_patterns = ["chrome", "chromium", "google-chrome", "undetected_chro"]
1✔
760
        # chromedriverは除外
761
        if "chromedriver" in process_name:
1✔
762
            return False
1✔
763
        return any(pattern in process_name for pattern in chrome_patterns)
1✔
764
    except (psutil.NoSuchProcess, psutil.AccessDenied):
1✔
765
        return False
1✔
766

767

768
def _get_chrome_processes_by_pgid(chromedriver_pid: int, existing_pids: set[int]) -> list[int]:
1✔
769
    """プロセスグループIDで追加のChrome関連プロセスを取得"""
770
    additional_pids = []
×
771
    try:
×
772
        pgid = os.getpgid(chromedriver_pid)
×
773
        for proc in psutil.process_iter(["pid", "name", "ppid"]):
×
774
            if proc.info["pid"] in existing_pids:
×
775
                continue
×
776
            try:
×
777
                if os.getpgid(proc.info["pid"]) == pgid:
×
778
                    proc_obj = psutil.Process(proc.info["pid"])
×
779
                    if _is_chrome_related_process(proc_obj):
×
780
                        additional_pids.append(proc.info["pid"])
×
781
                        logging.debug(
×
782
                            "Found Chrome-related process by pgid: PID %d, name: %s",
783
                            proc.info["pid"],
784
                            proc.info["name"],
785
                        )
786
            except (psutil.NoSuchProcess, psutil.AccessDenied, OSError):
×
787
                pass
×
788
    except (OSError, psutil.NoSuchProcess):
×
789
        logging.debug("Failed to get process group ID for chromedriver")
×
790
    return additional_pids
×
791

792

793
def _get_chrome_related_processes(driver: WebDriver) -> list[int]:
1✔
794
    """Chrome関連の全子プロセスを取得
795

796
    undetected_chromedriver 使用時、Chrome プロセスは chromedriver の子ではなく
797
    Python プロセスの直接の子として起動されることがあるため、両方を検索する。
798
    """
799
    chrome_pids = set()
×
800

801
    # 1. driver.service.process の子プロセスを検索
802
    try:
×
803
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "process"):  # type: ignore[attr-defined]
×
804
            process = driver.service.process  # type: ignore[attr-defined]
×
805
            if process and hasattr(process, "pid"):
×
806
                chromedriver_pid = process.pid
×
807

808
                # psutilでプロセス階層を取得
809
                parent_process = psutil.Process(chromedriver_pid)
×
810
                children = parent_process.children(recursive=True)
×
811

812
                for child in children:
×
813
                    chrome_pids.add(child.pid)
×
814
                    logging.debug(
×
815
                        "Found Chrome-related process (service child): PID %d, name: %s",
816
                        child.pid,
817
                        child.name(),
818
                    )
819
    except Exception:
×
820
        logging.exception("Failed to get Chrome-related processes from service")
×
821

822
    # 2. 現在の Python プロセスの全子孫から Chrome 関連プロセスを検索
823
    try:
×
824
        current_process = psutil.Process()
×
825
        all_children = current_process.children(recursive=True)
×
826

827
        for child in all_children:
×
828
            if child.pid in chrome_pids:
×
829
                continue
×
830
            try:
×
831
                if _is_chrome_related_process(child):
×
832
                    chrome_pids.add(child.pid)
×
833
                    logging.debug(
×
834
                        "Found Chrome-related process (python child): PID %d, name: %s",
835
                        child.pid,
836
                        child.name(),
837
                    )
838
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
839
                pass
×
840
    except Exception:
×
841
        logging.exception("Failed to get Chrome-related processes from python children")
×
842

843
    return list(chrome_pids)
×
844

845

846
def _send_signal_to_processes(pids: list[int], sig: signal.Signals, signal_name: str) -> None:
1✔
847
    """プロセスリストに指定されたシグナルを送信"""
848
    errors = []
×
849
    for pid in pids:
×
850
        try:
×
851
            # プロセス名を取得
852
            try:
×
853
                process = psutil.Process(pid)
×
854
                process_name = process.name()
×
855
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
856
                process_name = "unknown"
×
857

858
            if sig == signal.SIGKILL:
×
859
                # プロセスがまだ存在するかチェック
860
                os.kill(pid, 0)  # シグナル0は存在確認
×
861
            os.kill(pid, sig)
×
862
            logging.info("Sent %s to process: PID %d (%s)", signal_name, pid, process_name)
×
863
        except (ProcessLookupError, OSError) as e:
×
864
            # プロセスが既に終了している場合は無視
865
            errors.append((pid, e))
×
866

867
    # エラーが発生した場合はまとめてログ出力
868
    if errors:
×
869
        logging.debug("Failed to send %s to some processes: %s", signal_name, errors)
×
870

871

872
def _terminate_chrome_processes(chrome_pids: list[int], timeout: float = 5.0) -> None:
1✔
873
    """Chrome関連プロセスを段階的に終了
874

875
    Args:
876
        chrome_pids: 終了対象のプロセスIDリスト
877
        timeout: SIGTERM後にプロセス終了を待機する最大時間(秒)
878

879
    """
880
    if not chrome_pids:
×
881
        return
×
882

883
    # 優雅な終了(SIGTERM)
884
    _send_signal_to_processes(chrome_pids, signal.SIGTERM, "SIGTERM")
×
885

886
    # プロセスの終了を待機(ポーリング)
887
    remaining_pids = list(chrome_pids)
×
888
    poll_interval = 0.2
×
889
    elapsed = 0.0
×
890

891
    while remaining_pids and elapsed < timeout:
×
892
        time.sleep(poll_interval)
×
893
        elapsed += poll_interval
×
894

895
        # まだ生存しているプロセスをチェック
896
        still_alive = []
×
897
        for pid in remaining_pids:
×
898
            try:
×
899
                if psutil.pid_exists(pid):
×
900
                    process = psutil.Process(pid)
×
901
                    if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
×
902
                        still_alive.append(pid)
×
903
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
904
                pass
×
905

906
        remaining_pids = still_alive
×
907

908
    # タイムアウト後もまだ残っているプロセスにのみ SIGKILL を送信
909
    if remaining_pids:
×
910
        logging.warning(
×
911
            "Chrome processes still alive after %.1fs, sending SIGKILL to %d processes",
912
            elapsed,
913
            len(remaining_pids),
914
        )
915
        _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
916

917

918
def _reap_single_process(pid: int) -> None:
1✔
919
    """単一プロセスをwaitpidで回収"""
920
    try:
×
921
        # ノンブロッキングでwaitpid
922
        result_pid, status = os.waitpid(pid, os.WNOHANG)
×
923
        if result_pid == pid:
×
924
            # プロセス名を取得
925
            try:
×
926
                process = psutil.Process(pid)
×
927
                process_name = process.name()
×
928
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
929
                process_name = "unknown"
×
930
            logging.debug("Reaped Chrome process: PID %d (%s)", pid, process_name)
×
931
    except (ChildProcessError, OSError):
×
932
        # 子プロセスでない場合や既に回収済みの場合は無視
933
        pass
×
934

935

936
def _reap_chrome_processes(chrome_pids: list[int]) -> None:
1✔
937
    """Chrome関連プロセスを明示的に回収してゾンビ化を防ぐ"""
938
    for pid in chrome_pids:
×
939
        _reap_single_process(pid)
×
940

941

942
def _get_remaining_chrome_pids(chrome_pids: list[int]) -> list[int]:
1✔
943
    """指定されたPIDリストから、まだ生存しているChrome関連プロセスを取得"""
944
    remaining = []
×
945
    for pid in chrome_pids:
×
946
        try:
×
947
            if psutil.pid_exists(pid):
×
948
                process = psutil.Process(pid)
×
949
                if (
×
950
                    process.is_running()
951
                    and process.status() != psutil.STATUS_ZOMBIE
952
                    and _is_chrome_related_process(process)
953
                ):
954
                    remaining.append(pid)
×
955
        except (psutil.NoSuchProcess, psutil.AccessDenied):
×
956
            pass
×
957
    return remaining
×
958

959

960
def _wait_for_processes_with_check(
1✔
961
    chrome_pids: list[int],
962
    timeout: float,
963
    poll_interval: float = 0.2,
964
    log_interval: float = 1.0,
965
) -> list[int]:
966
    """プロセスの終了を待機しつつ、残存プロセスをチェック
967

968
    Args:
969
        chrome_pids: 監視対象のプロセスIDリスト
970
        timeout: 最大待機時間(秒)
971
        poll_interval: チェック間隔(秒)
972
        log_interval: ログ出力間隔(秒)
973

974
    Returns:
975
        タイムアウト後も残存しているプロセスIDのリスト
976

977
    """
978
    elapsed = 0.0
×
979
    last_log_time = 0.0
×
980
    remaining_pids = list(chrome_pids)
×
981

982
    while remaining_pids and elapsed < timeout:
×
983
        time.sleep(poll_interval)
×
984
        elapsed += poll_interval
×
985
        remaining_pids = _get_remaining_chrome_pids(remaining_pids)
×
986

987
        if remaining_pids and (elapsed - last_log_time) >= log_interval:
×
988
            logging.info(
×
989
                "Found %d remaining Chrome processes after %.0fs",
990
                len(remaining_pids),
991
                elapsed,
992
            )
993
            last_log_time = elapsed
×
994

995
    return remaining_pids
×
996

997

998
def quit_driver_gracefully(
1✔
999
    driver: WebDriver | None,
1000
    wait_sec: float = 5.0,
1001
    sigterm_wait_sec: float = 5.0,
1002
    sigkill_wait_sec: float = 5.0,
1003
) -> None:
1004
    """Chrome WebDriverを確実に終了する
1005

1006
    終了フロー:
1007
    1. driver.quit() を呼び出し
1008
    2. wait_sec 秒待機しつつプロセス終了をチェック
1009
    3. 残存プロセスがあれば SIGTERM を送信
1010
    4. sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1011
    5. 残存プロセスがあれば SIGKILL を送信
1012
    6. sigkill_wait_sec 秒待機
1013

1014
    Args:
1015
        driver: 終了する WebDriver インスタンス
1016
        wait_sec: quit 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1017
        sigterm_wait_sec: SIGTERM 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1018
        sigkill_wait_sec: SIGKILL 後にプロセス回収を待機する秒数(デフォルト: 5秒)
1019

1020
    """
1021
    if driver is None:
1✔
1022
        return
1✔
1023

1024
    # quit前にChrome関連プロセスを記録
1025
    chrome_pids_before = _get_chrome_related_processes(driver)
1✔
1026

1027
    try:
1✔
1028
        # WebDriverの正常終了を試行(これがタブのクローズも含む)
1029
        driver.quit()
1✔
1030
        logging.info("WebDriver quit successfully")
1✔
1031
    except Exception:
1✔
1032
        logging.warning("Failed to quit driver normally", exc_info=True)
1✔
1033
    finally:
1034
        # undetected_chromedriver の __del__ がシャットダウン時に再度呼ばれるのを防ぐ
1035
        if hasattr(driver, "_has_quit"):
1✔
1036
            driver._has_quit = True  # type: ignore[attr-defined]
1✔
1037

1038
    # ChromeDriverサービスの停止を試行
1039
    try:
1✔
1040
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "stop"):  # type: ignore[attr-defined]
1✔
1041
            driver.service.stop()  # type: ignore[attr-defined]
×
1042
    except (ConnectionResetError, OSError):
×
1043
        # Chrome が既に終了している場合は無視
1044
        logging.debug("Chrome service already stopped")
×
1045
    except Exception:
×
1046
        logging.warning("Failed to stop Chrome service", exc_info=True)
×
1047

1048
    # Step 1: quit 後に wait_sec 秒待機しつつプロセス終了をチェック
1049
    remaining_pids = _wait_for_processes_with_check(chrome_pids_before, wait_sec)
1✔
1050

1051
    if not remaining_pids:
1✔
1052
        logging.debug("All Chrome processes exited normally")
1✔
1053
        return
1✔
1054

1055
    # Step 2: 残存プロセスに SIGTERM を送信
1056
    logging.info(
×
1057
        "Found %d remaining Chrome processes after %.0fs, sending SIGTERM",
1058
        len(remaining_pids),
1059
        wait_sec,
1060
    )
1061
    _send_signal_to_processes(remaining_pids, signal.SIGTERM, "SIGTERM")
×
1062

1063
    # Step 3: SIGTERM 後に sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1064
    remaining_pids = _wait_for_processes_with_check(remaining_pids, sigterm_wait_sec)
×
1065

1066
    if not remaining_pids:
×
1067
        logging.info("All Chrome processes exited after SIGTERM")
×
1068
        _reap_chrome_processes(chrome_pids_before)
×
1069
        return
×
1070

1071
    # Step 4: 残存プロセスに SIGKILL を送信
1072
    logging.warning(
×
1073
        "Chrome processes still alive after SIGTERM + %.1fs, sending SIGKILL to %d processes",
1074
        sigterm_wait_sec,
1075
        len(remaining_pids),
1076
    )
1077
    _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
1078

1079
    # Step 5: SIGKILL 後に sigkill_wait_sec 秒待機してプロセス回収
1080
    time.sleep(sigkill_wait_sec)
×
1081
    _reap_chrome_processes(chrome_pids_before)
×
1082

1083
    # 最終チェック:まだ残っているプロセスがあるか確認
1084
    still_remaining = _get_remaining_chrome_pids(remaining_pids)
×
1085

1086
    # 回収できなかったプロセスについて警告
1087
    if still_remaining:
×
1088
        for pid in still_remaining:
×
1089
            try:
×
1090
                process = psutil.Process(pid)
×
1091
                logging.warning("Failed to collect Chrome-related process: PID %d (%s)", pid, process.name())
×
1092
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1093
                pass
×
1094

1095

1096
if __name__ == "__main__":
1097
    import pathlib
1098

1099
    import docopt
1100
    import selenium.webdriver.support.wait
1101

1102
    import my_lib.config
1103
    import my_lib.logger
1104

1105
    assert __doc__ is not None  # noqa: S101
1106
    args = docopt.docopt(__doc__)
1107

1108
    config_file = args["-c"]
1109
    debug_mode = args["-D"]
1110

1111
    my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
1112

1113
    config = my_lib.config.load(config_file)
1114

1115
    driver = create_driver("test", pathlib.Path(config["data"]["selenium"]))
1116
    wait = selenium.webdriver.support.wait.WebDriverWait(driver, 5)
1117

1118
    driver.get("https://www.google.com/")
1119
    wait.until(
1120
        selenium.webdriver.support.expected_conditions.presence_of_element_located(
1121
            (selenium.webdriver.common.by.By.XPATH, '//input[contains(@value, "Google")]')
1122
        )
1123
    )
1124

1125
    quit_driver_gracefully(driver)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc