• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kimata / my-py-lib / 20688532990

04 Jan 2026 05:57AM UTC coverage: 64.791% (-0.2%) from 64.965%
20688532990

push

github

kimata
refactor: Chrome プロファイル関連関数を chrome_util.py に集約

selenium_util.py から以下の関数を chrome_util.py に移動:
- _ProfileHealthResult, _check_json_file, _check_sqlite_db
- _check_profile_health, _recover_corrupted_profile
- _cleanup_profile_lock, _is_running_in_container
- _cleanup_orphaned_chrome_processes_in_container
- _get_actual_profile_name, delete_profile

chrome_util.py の未使用関数を削除:
- cleanup_old_chrome_profiles, cleanup_orphaned_chrome_processes
- _cleanup_chrome_process_groups, get_chrome_profile_stats

selenium_util.py では chrome_util をインポートして使用
delete_profile は後方互換性のため再エクスポート

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

102 of 136 new or added lines in 2 files covered. (75.0%)

2 existing lines in 2 files now uncovered.

3097 of 4780 relevant lines covered (64.79%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.91
/src/my_lib/selenium_util.py
1
#!/usr/bin/env python3
2
"""
3
Selenium を Chrome Driver を使って動かします。
4

5
Usage:
6
  selenium_util.py [-c CONFIG] [-D]
7

8
Options:
9
  -c CONFIG         : CONFIG を設定ファイルとして読み込んで実行します。
10
                      [default: tests/fixtures/config.example.yaml]
11
  -D                : デバッグモードで動作します。
12
"""
13

14
from __future__ import annotations
1✔
15

16
import datetime
1✔
17
import inspect
1✔
18
import io
1✔
19
import logging
1✔
20
import os
1✔
21
import pathlib
1✔
22
import random
1✔
23
import re
1✔
24
import signal
1✔
25
import subprocess
1✔
26
import time
1✔
27
from typing import TYPE_CHECKING, Any, Self, TypeVar
1✔
28

29
import PIL.Image
1✔
30
import psutil
1✔
31
import selenium
1✔
32
import selenium.common.exceptions
1✔
33
import selenium.webdriver.chrome.options
1✔
34
import selenium.webdriver.chrome.service
1✔
35
import selenium.webdriver.common.action_chains
1✔
36
import selenium.webdriver.common.by
1✔
37
import selenium.webdriver.common.keys
1✔
38
import selenium.webdriver.support.expected_conditions
1✔
39
import undetected_chromedriver
1✔
40

41
import my_lib.chrome_util
1✔
42
import my_lib.time
1✔
43

44
if TYPE_CHECKING:
45
    import types
46
    from collections.abc import Callable
47

48
    from selenium.webdriver.remote.webdriver import WebDriver
49
    from selenium.webdriver.support.wait import WebDriverWait
50

51
T = TypeVar("T")
1✔
52

53
WAIT_RETRY_COUNT: int = 1
1✔
54

55

56
class SeleniumError(Exception):
1✔
57
    """Selenium 関連エラーの基底クラス"""
58

59

60
def _get_chrome_version() -> int | None:
1✔
61
    try:
1✔
62
        result = subprocess.run(
1✔
63
            ["google-chrome", "--version"],  # noqa: S607
64
            capture_output=True,
65
            text=True,
66
            timeout=10,
67
            check=False,
68
        )
69
        match = re.search(r"(\d+)\.", result.stdout)
1✔
70
        if match:
1✔
71
            return int(match.group(1))
1✔
72
    except Exception:
1✔
73
        logging.warning("Failed to detect Chrome version")
1✔
74
    return None
1✔
75

76

77
def _create_driver_impl(
1✔
78
    profile_name: str,
79
    data_path: pathlib.Path,
80
    is_headless: bool,  # noqa: FBT001
81
    use_subprocess: bool,  # noqa: FBT001
82
) -> WebDriver:
83
    chrome_data_path = data_path / "chrome"
×
84
    log_path = data_path / "log"
×
85

86
    # NOTE: Pytest を並列実行できるようにする
87
    suffix = os.environ.get("PYTEST_XDIST_WORKER", None)
×
88
    if suffix is not None:
×
89
        profile_name += "." + suffix
×
90

91
    chrome_data_path.mkdir(parents=True, exist_ok=True)
×
92
    log_path.mkdir(parents=True, exist_ok=True)
×
93

94
    options = selenium.webdriver.chrome.options.Options()
×
95

96
    if is_headless:
×
97
        options.add_argument("--headless=new")
×
98

99
    options.add_argument("--no-sandbox")  # for Docker
×
100
    options.add_argument("--disable-dev-shm-usage")  # for Docker
×
101
    options.add_argument("--disable-gpu")
×
102

103
    options.add_argument("--disable-popup-blocking")
×
104
    options.add_argument("--disable-plugins")
×
105

106
    options.add_argument("--no-first-run")
×
107

108
    options.add_argument("--lang=ja-JP")
×
109
    options.add_argument("--window-size=1920,1080")
×
110

111
    # NOTE: Accept-Language ヘッダーを日本語優先に設定
112
    options.add_experimental_option("prefs", {"intl.accept_languages": "ja-JP,ja,en-US,en"})
×
113

114
    options.add_argument("--user-data-dir=" + str(chrome_data_path / profile_name))
×
115

116
    options.add_argument("--enable-logging")
×
117
    options.add_argument("--v=1")
×
118

119
    chrome_log_file = log_path / f"chrome_{profile_name}.log"
×
120
    options.add_argument(f"--log-file={chrome_log_file!s}")
×
121

122
    if not is_headless:
×
123
        options.add_argument("--auto-open-devtools-for-tabs")
×
124

125
    service = selenium.webdriver.chrome.service.Service(
×
126
        service_args=["--verbose", f"--log-path={str(log_path / 'webdriver.log')!s}"],
127
    )
128

129
    chrome_version = _get_chrome_version()
×
130

131
    # NOTE: user_multi_procs=True は既存の chromedriver ファイルが存在することを前提としているため、
132
    # ファイルが存在しない場合(CI環境の初回実行など)は False にする
133
    uc_data_path = pathlib.Path("~/.local/share/undetected_chromedriver").expanduser()
×
134
    use_multi_procs = uc_data_path.exists() and any(uc_data_path.glob("*chromedriver*"))
×
135

136
    driver = undetected_chromedriver.Chrome(
×
137
        service=service,
138
        options=options,
139
        use_subprocess=use_subprocess,
140
        version_main=chrome_version,
141
        user_multi_procs=use_multi_procs,
142
    )
143

144
    driver.set_page_load_timeout(30)
×
145

146
    # CDP を使って日本語ロケールを強制設定
147
    set_japanese_locale(driver)
×
148

149
    return driver
×
150

151

152
# chrome_util から再エクスポート(後方互換性のため)
153
delete_profile = my_lib.chrome_util.delete_profile
1✔
154

155

156
def create_driver(  # noqa: PLR0913
1✔
157
    profile_name: str,
158
    data_path: pathlib.Path,
159
    is_headless: bool = True,  # noqa: FBT001
160
    clean_profile: bool = False,  # noqa: FBT001
161
    auto_recover: bool = True,  # noqa: FBT001
162
    use_subprocess: bool = False,  # noqa: FBT001
163
) -> WebDriver:
164
    """Chrome WebDriver を作成する
165

166
    Args:
167
        profile_name: プロファイル名
168
        data_path: データディレクトリのパス
169
        is_headless: ヘッドレスモードで起動するか
170
        clean_profile: 起動前にロックファイルを削除するか
171
        auto_recover: プロファイル破損時に自動リカバリするか
172
        use_subprocess: サブプロセスで Chrome を起動するか
173

174
    """
175
    # NOTE: ルートロガーの出力レベルを変更した場合でも Selenium 関係は抑制する
176
    logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
×
177
    logging.getLogger("selenium.webdriver.common.selenium_manager").setLevel(logging.WARNING)
×
178
    logging.getLogger("selenium.webdriver.remote.remote_connection").setLevel(logging.WARNING)
×
179

180
    # NOTE: chrome_util の内部関数を使用(同一パッケージ内での使用は許容)
NEW
181
    actual_profile_name = my_lib.chrome_util._get_actual_profile_name(profile_name)  # noqa: SLF001
×
UNCOV
182
    profile_path = data_path / "chrome" / actual_profile_name
×
183

184
    # プロファイル健全性チェック
NEW
185
    health = my_lib.chrome_util._check_profile_health(profile_path)  # noqa: SLF001
×
186
    if not health.is_healthy:
×
187
        logging.warning("Profile health check failed: %s", ", ".join(health.errors))
×
188

189
        if health.has_lock_files and not (health.has_corrupted_json or health.has_corrupted_db):
×
190
            # ロックファイルのみの問題なら削除して続行
191
            logging.info("Cleaning up lock files only")
×
NEW
192
            my_lib.chrome_util._cleanup_profile_lock(profile_path)  # noqa: SLF001
×
193
        elif auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
194
            # JSON または DB が破損している場合はプロファイルをリカバリ
195
            logging.warning("Profile is corrupted, attempting recovery")
×
NEW
196
            if my_lib.chrome_util._recover_corrupted_profile(profile_path):  # noqa: SLF001
×
197
                logging.info("Profile recovery successful, will create new profile")
×
198
            else:
199
                logging.error("Profile recovery failed")
×
200

201
    if clean_profile:
×
NEW
202
        my_lib.chrome_util._cleanup_profile_lock(profile_path)  # noqa: SLF001
×
203

204
    # NOTE: 1回だけ自動リトライ
205
    try:
×
206
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess)
×
207
    except Exception as e:
×
208
        logging.warning("First attempt to create driver failed: %s", e)
×
209

210
        # コンテナ内で実行中の場合のみ、残った Chrome プロセスをクリーンアップ
NEW
211
        my_lib.chrome_util._cleanup_orphaned_chrome_processes_in_container()  # noqa: SLF001
×
212

213
        # プロファイルのロックファイルを削除
NEW
214
        my_lib.chrome_util._cleanup_profile_lock(profile_path)  # noqa: SLF001
×
215

216
        # 再度健全性チェック
NEW
217
        health = my_lib.chrome_util._check_profile_health(profile_path)  # noqa: SLF001
×
218
        if not health.is_healthy and auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
219
            logging.warning("Profile still corrupted after first attempt, recovering")
×
NEW
220
            my_lib.chrome_util._recover_corrupted_profile(profile_path)  # noqa: SLF001
×
221

222
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess)
×
223

224

225
def xpath_exists(driver: WebDriver, xpath: str) -> bool:
1✔
226
    return len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0
×
227

228

229
def get_text(
1✔
230
    driver: WebDriver,
231
    xpath: str,
232
    safe_text: str,
233
    wait: WebDriverWait[WebDriver] | None = None,
234
) -> str:
235
    if wait is not None:
×
236
        wait.until(
×
237
            selenium.webdriver.support.expected_conditions.presence_of_all_elements_located(
238
                (selenium.webdriver.common.by.By.XPATH, xpath)
239
            )
240
        )
241

242
    if len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0:
×
243
        return driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).text.strip()
×
244
    else:
245
        return safe_text
×
246

247

248
def input_xpath(
1✔
249
    driver: WebDriver,
250
    xpath: str,
251
    text: str,
252
    wait: WebDriverWait[WebDriver] | None = None,
253
    is_warn: bool = True,  # noqa: FBT001
254
) -> bool:
255
    if wait is not None:
×
256
        wait.until(
×
257
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
258
                (selenium.webdriver.common.by.By.XPATH, xpath)
259
            )
260
        )
261
        time.sleep(0.05)
×
262

263
    if xpath_exists(driver, xpath):
×
264
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).send_keys(text)
×
265
        return True
×
266
    else:
267
        if is_warn:
×
268
            logging.warning("Element is not found: %s", xpath)
×
269
        return False
×
270

271

272
def click_xpath(
1✔
273
    driver: WebDriver,
274
    xpath: str,
275
    wait: WebDriverWait[WebDriver] | None = None,
276
    is_warn: bool = True,  # noqa: FBT001
277
    move: bool = False,  # noqa: FBT001
278
) -> bool:
279
    if wait is not None:
×
280
        wait.until(
×
281
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
282
                (selenium.webdriver.common.by.By.XPATH, xpath)
283
            )
284
        )
285
        time.sleep(0.05)
×
286

287
    if xpath_exists(driver, xpath):
×
288
        elem = driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath)
×
289
        if move:
×
290
            action = selenium.webdriver.common.action_chains.ActionChains(driver)
×
291
            action.move_to_element(elem)
×
292
            action.perform()
×
293

294
        elem.click()
×
295
        return True
×
296
    else:
297
        if is_warn:
×
298
            logging.warning("Element is not found: %s", xpath)
×
299
        return False
×
300

301

302
def is_display(driver: WebDriver, xpath: str) -> bool:
1✔
303
    return (len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0) and (
×
304
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).is_displayed()
305
    )
306

307

308
def random_sleep(sec: float) -> None:
1✔
309
    RATIO = 0.8
1✔
310

311
    time.sleep((sec * RATIO) + (sec * (1 - RATIO) * 2) * random.random())  # noqa: S311
1✔
312

313

314
def with_retry(
1✔
315
    func: Callable[[], T],
316
    max_retries: int = 3,
317
    delay: float = 1.0,
318
    exceptions: tuple[type[Exception], ...] = (Exception,),
319
    on_retry: Callable[[int, Exception], bool | None] | None = None,
320
) -> T:
321
    """リトライ付きで関数を実行
322

323
    全て失敗した場合は最後の例外を再スロー。
324
    呼び出し側で try/except してエラー処理を行う。
325

326
    Args:
327
        func: 実行する関数
328
        max_retries: 最大リトライ回数
329
        delay: リトライ間の待機秒数
330
        exceptions: リトライ対象の例外タプル
331
        on_retry: リトライ時のコールバック (attempt, exception)
332
            - None または True を返すとリトライを継続
333
            - False を返すとリトライを中止して例外を再スロー
334

335
    Returns:
336
        成功時は関数の戻り値
337

338
    Raises:
339
        最後の例外を再スロー
340

341
    """
342
    last_exception: Exception | None = None
×
343

344
    for attempt in range(max_retries):
×
345
        try:
×
346
            return func()
×
347
        except exceptions as e:
×
348
            last_exception = e
×
349
            if attempt < max_retries - 1:
×
350
                if on_retry:
×
351
                    should_continue = on_retry(attempt + 1, e)
×
352
                    if should_continue is False:
×
353
                        raise
×
354
                time.sleep(delay)
×
355

356
    if last_exception:
×
357
        raise last_exception
×
358
    raise RuntimeError("Unexpected state in with_retry")
×
359

360

361
def wait_patiently(
1✔
362
    driver: WebDriver,
363
    wait: WebDriverWait[WebDriver],
364
    target: Any,
365
) -> None:
366
    error: selenium.common.exceptions.TimeoutException | None = None
×
367
    for i in range(WAIT_RETRY_COUNT + 1):
×
368
        try:
×
369
            wait.until(target)
×
370
            return
×
371
        except selenium.common.exceptions.TimeoutException as e:
×
372
            logging.warning(
×
373
                "タイムアウトが発生しました。(%s in %s line %d)",
374
                inspect.stack()[1].function,
375
                inspect.stack()[1].filename,
376
                inspect.stack()[1].lineno,
377
            )
378
            error = e
×
379

380
            logging.info(i)
×
381
            if i != WAIT_RETRY_COUNT:
×
382
                logging.info("refresh")
×
383
                driver.refresh()
×
384

385
    if error is not None:
×
386
        raise error
×
387

388

389
def dump_page(
1✔
390
    driver: WebDriver,
391
    index: int,
392
    dump_path: pathlib.Path,
393
    stack_index: int = 1,
394
) -> None:
395
    name = inspect.stack()[stack_index].function.replace("<", "").replace(">", "")
×
396

397
    dump_path.mkdir(parents=True, exist_ok=True)
×
398

399
    png_path = dump_path / f"{name}_{index:02d}.png"
×
400
    htm_path = dump_path / f"{name}_{index:02d}.htm"
×
401

402
    driver.save_screenshot(str(png_path))
×
403

404
    with htm_path.open("w", encoding="utf-8") as f:
×
405
        f.write(driver.page_source)
×
406

407
    logging.info(
×
408
        "page dump: %02d from %s in %s line %d",
409
        index,
410
        inspect.stack()[stack_index].function,
411
        inspect.stack()[stack_index].filename,
412
        inspect.stack()[stack_index].lineno,
413
    )
414

415

416
def clear_cache(driver: WebDriver) -> None:
1✔
417
    driver.execute_cdp_cmd("Network.clearBrowserCache", {})
×
418

419

420
def set_japanese_locale(driver: WebDriver) -> None:
1✔
421
    """CDP を使って日本語ロケールを強制設定
422

423
    Chrome の起動オプションだけでは言語設定が変わってしまうことがあるため、
424
    CDP を使って Accept-Language ヘッダーとロケールを強制的に日本語に設定する。
425
    """
426
    try:
×
427
        # NOTE: Network.setExtraHTTPHeaders は Network.enable を先に呼ばないと機能しない
428
        driver.execute_cdp_cmd("Network.enable", {})
×
429
        driver.execute_cdp_cmd(
×
430
            "Network.setExtraHTTPHeaders",
431
            {"headers": {"Accept-Language": "ja-JP,ja;q=0.9"}},
432
        )
433
        driver.execute_cdp_cmd(
×
434
            "Emulation.setLocaleOverride",
435
            {"locale": "ja-JP"},
436
        )
437
        logging.debug("Japanese locale set via CDP")
×
438
    except Exception:
×
439
        logging.warning("Failed to set Japanese locale via CDP")
×
440

441

442
def clean_dump(dump_path: pathlib.Path, keep_days: int = 1) -> None:
1✔
443
    if not dump_path.exists():
1✔
444
        return
1✔
445

446
    time_threshold = datetime.timedelta(keep_days)
1✔
447

448
    for item in dump_path.iterdir():
1✔
449
        if not item.is_file():
1✔
450
            continue
1✔
451
        try:
1✔
452
            time_diff = datetime.datetime.now(datetime.UTC) - datetime.datetime.fromtimestamp(
1✔
453
                item.stat().st_mtime, datetime.UTC
454
            )
455
        except FileNotFoundError:
×
456
            # ファイルが別プロセスにより削除された場合(SQLiteの一時ファイルなど)
457
            continue
×
458
        if time_diff > time_threshold:
1✔
459
            logging.warning("remove %s [%s day(s) old].", item.absolute(), f"{time_diff.days:,}")
1✔
460

461
            item.unlink(missing_ok=True)
1✔
462

463

464
def get_memory_info(driver: WebDriver) -> dict[str, Any]:
1✔
465
    """ブラウザのメモリ使用量を取得(単位: KB)"""
466
    total_bytes = subprocess.Popen(  # noqa: S602
×
467
        "smem -t -c pss -P chrome | tail -n 1",  # noqa: S607
468
        shell=True,
469
        stdout=subprocess.PIPE,
470
    ).communicate()[0]
471
    total = int(str(total_bytes, "utf-8").strip())  # smem の出力は KB 単位
×
472

473
    try:
×
474
        memory_info = driver.execute_cdp_cmd("Memory.getAllTimeSamplingProfile", {})
×
475
        heap_usage = driver.execute_cdp_cmd("Runtime.getHeapUsage", {})
×
476

477
        heap_used = heap_usage.get("usedSize", 0) // 1024  # bytes → KB
×
478
        heap_total = heap_usage.get("totalSize", 0) // 1024  # bytes → KB
×
479
    except Exception as e:
×
480
        logging.debug("Failed to get memory usage: %s", e)
×
481

482
        memory_info = None
×
483
        heap_used = 0
×
484
        heap_total = 0
×
485

486
    return {
×
487
        "total": total,
488
        "heap_used": heap_used,
489
        "heap_total": heap_total,
490
        "memory_info": memory_info,
491
    }
492

493

494
def log_memory_usage(driver: WebDriver) -> None:
1✔
495
    mem_info = get_memory_info(driver)
×
496
    logging.info(
×
497
        "Chrome memory: %s MB (JS heap: %s MB)",
498
        f"""{mem_info["total"] // 1024:,}""",
499
        f"""{mem_info["heap_used"] // 1024:,}""",
500
    )
501

502

503
def _warmup(
1✔
504
    driver: WebDriver,
505
    keyword: str,
506
    url_pattern: str,
507
    sleep_sec: int = 3,
508
) -> None:
509
    # NOTE: ダミーアクセスを行って BOT ではないと思わせる。(効果なさそう...)
510
    driver.get("https://www.yahoo.co.jp/")
×
511
    time.sleep(sleep_sec)
×
512

513
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(keyword)
×
514
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(
×
515
        selenium.webdriver.common.keys.Keys.ENTER
516
    )
517

518
    time.sleep(sleep_sec)
×
519

520
    driver.find_element(
×
521
        selenium.webdriver.common.by.By.XPATH, f'//a[contains(@href, "{url_pattern}")]'
522
    ).click()
523

524
    time.sleep(sleep_sec)
×
525

526

527
class browser_tab:  # noqa: N801
1✔
528
    """新しいブラウザタブで URL を開くコンテキストマネージャ"""
529

530
    def __init__(self, driver: WebDriver, url: str) -> None:
1✔
531
        """初期化
532

533
        Args:
534
            driver: WebDriver インスタンス
535
            url: 開く URL
536

537
        """
538
        self.driver = driver
1✔
539
        self.url = url
1✔
540
        self.original_window: str | None = None
1✔
541

542
    def __enter__(self) -> None:
1✔
543
        """新しいタブを開いて URL にアクセス"""
544
        self.original_window = self.driver.current_window_handle
1✔
545
        self.driver.execute_script("window.open('');")
1✔
546
        self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
547
        try:
1✔
548
            self.driver.get(self.url)
1✔
549
        except Exception:
×
550
            # NOTE: URL読み込みに失敗した場合もクリーンアップしてから例外を再送出
551
            self._cleanup()
×
552
            raise
×
553

554
    def _cleanup(self) -> None:
1✔
555
        """タブを閉じて元のウィンドウに戻る"""
556
        try:
1✔
557
            # 余分なタブを閉じる
558
            while len(self.driver.window_handles) > 1:
1✔
559
                self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
560
                self.driver.close()
1✔
561
            if self.original_window is not None:
1✔
562
                self.driver.switch_to.window(self.original_window)
1✔
563
            time.sleep(0.1)
1✔
564
        except Exception:
1✔
565
            # NOTE: Chromeがクラッシュした場合は無視(既に終了しているため操作不可)
566
            logging.exception("タブのクリーンアップに失敗しました(Chromeがクラッシュした可能性があります)")
1✔
567

568
    def _recover_from_error(self) -> None:
1✔
569
        """エラー後にブラウザの状態を回復する"""
570
        try:
×
571
            # ページロードタイムアウトをリセット(負の値になっている可能性があるため)
572
            self.driver.set_page_load_timeout(30)
×
573

574
            # about:blank に移動してレンダラーの状態をリセット
575
            self.driver.get("about:blank")
×
576
            time.sleep(0.5)
×
577
        except Exception:
×
578
            logging.warning("ブラウザの回復に失敗しました")
×
579

580
    def __exit__(
1✔
581
        self,
582
        exception_type: type[BaseException] | None,
583
        exception_value: BaseException | None,
584
        traceback: types.TracebackType | None,
585
    ) -> None:
586
        """タブを閉じて元のウィンドウに戻る"""
587
        self._cleanup()
1✔
588

589
        # 例外が発生した場合はブラウザの状態を回復
590
        if exception_type is not None:
1✔
591
            self._recover_from_error()
×
592

593

594
class error_handler:  # noqa: N801
1✔
595
    """Selenium操作時のエラーハンドリング用コンテキストマネージャ
596

597
    エラー発生時に自動でログ出力、スクリーンショット取得、コールバック呼び出しを行う。
598

599
    Args:
600
        driver: WebDriver インスタンス
601
        message: ログに出力するエラーメッセージ
602
        on_error: エラー時に呼ばれるコールバック関数 (exception, screenshot: PIL.Image.Image | None) -> None
603
        capture_screenshot: スクリーンショットを自動取得するか(デフォルト: True)
604
        reraise: 例外を再送出するか(デフォルト: True)
605

606
    Attributes:
607
        exception: 発生した例外(エラーがなければ None)
608
        screenshot: 取得したスクリーンショット(PIL.Image.Image、取得失敗時は None)
609

610
    Examples:
611
        基本的な使用方法::
612

613
            with my_lib.selenium_util.error_handler(driver, message="ログイン処理に失敗") as handler:
614
                driver.get(login_url)
615
                driver.find_element(...).click()
616

617
        コールバック付き(Slack通知など)::
618

619
            def notify(exc, screenshot):
620
                slack.error("エラー発生", str(exc), screenshot)
621

622
            with my_lib.selenium_util.error_handler(
623
                driver,
624
                message="クロール処理に失敗",
625
                on_error=notify,
626
            ):
627
                crawl_page(driver)
628

629
        例外を抑制して続行::
630

631
            with my_lib.selenium_util.error_handler(driver, reraise=False) as handler:
632
                risky_operation()
633

634
            if handler.exception:
635
                logging.warning("処理をスキップしました")
636

637
    """
638

639
    def __init__(
1✔
640
        self,
641
        driver: WebDriver,
642
        message: str = "Selenium operation failed",
643
        on_error: Callable[[Exception, PIL.Image.Image | None], None] | None = None,
644
        capture_screenshot: bool = True,  # noqa: FBT001
645
        reraise: bool = True,  # noqa: FBT001
646
    ) -> None:
647
        """初期化"""
648
        self.driver = driver
1✔
649
        self.message = message
1✔
650
        self.on_error = on_error
1✔
651
        self.capture_screenshot = capture_screenshot
1✔
652
        self.reraise = reraise
1✔
653
        self.exception: Exception | None = None
1✔
654
        self.screenshot: PIL.Image.Image | None = None
1✔
655

656
    def __enter__(self) -> Self:
1✔
657
        """コンテキストマネージャの開始"""
658
        return self
1✔
659

660
    def __exit__(
1✔
661
        self,
662
        exception_type: type[BaseException] | None,
663
        exception_value: BaseException | None,
664
        traceback: types.TracebackType | None,
665
    ) -> bool:
666
        """コンテキストマネージャの終了、エラー処理を実行"""
667
        if exception_value is None:
1✔
668
            return False
1✔
669

670
        # 例外を記録
671
        if isinstance(exception_value, Exception):
1✔
672
            self.exception = exception_value
1✔
673
        else:
674
            # BaseException(KeyboardInterrupt など)は処理せず再送出
675
            return False
×
676

677
        # ログ出力
678
        logging.exception(self.message)
1✔
679

680
        # スクリーンショット取得
681
        if self.capture_screenshot:
1✔
682
            try:
1✔
683
                screenshot_bytes = self.driver.get_screenshot_as_png()
1✔
684
                self.screenshot = PIL.Image.open(io.BytesIO(screenshot_bytes))
1✔
685
            except Exception:
1✔
686
                logging.debug("Failed to capture screenshot for error handling")
1✔
687

688
        # コールバック呼び出し
689
        if self.on_error is not None:
1✔
690
            try:
1✔
691
                self.on_error(self.exception, self.screenshot)
1✔
692
            except Exception:
×
693
                logging.exception("Error in on_error callback")
×
694

695
        # reraise=False なら例外を抑制
696
        return not self.reraise
1✔
697

698

699
def _is_chrome_related_process(process: psutil.Process) -> bool:
1✔
700
    """プロセスがChrome関連かどうかを判定"""
701
    try:
1✔
702
        process_name = process.name().lower()
1✔
703
        # Chrome関連のプロセス名パターン
704
        chrome_patterns = ["chrome", "chromium", "google-chrome", "undetected_chro"]
1✔
705
        # chromedriverは除外
706
        if "chromedriver" in process_name:
1✔
707
            return False
1✔
708
        return any(pattern in process_name for pattern in chrome_patterns)
1✔
709
    except (psutil.NoSuchProcess, psutil.AccessDenied):
1✔
710
        return False
1✔
711

712

713
def _get_chrome_processes_by_pgid(chromedriver_pid: int, existing_pids: set[int]) -> list[int]:
1✔
714
    """プロセスグループIDで追加のChrome関連プロセスを取得"""
715
    additional_pids = []
×
716
    try:
×
717
        pgid = os.getpgid(chromedriver_pid)
×
718
        for proc in psutil.process_iter(["pid", "name", "ppid"]):
×
719
            if proc.info["pid"] in existing_pids:
×
720
                continue
×
721
            try:
×
722
                if os.getpgid(proc.info["pid"]) == pgid:
×
723
                    proc_obj = psutil.Process(proc.info["pid"])
×
724
                    if _is_chrome_related_process(proc_obj):
×
725
                        additional_pids.append(proc.info["pid"])
×
726
                        logging.debug(
×
727
                            "Found Chrome-related process by pgid: PID %d, name: %s",
728
                            proc.info["pid"],
729
                            proc.info["name"],
730
                        )
731
            except (psutil.NoSuchProcess, psutil.AccessDenied, OSError):
×
732
                pass
×
733
    except (OSError, psutil.NoSuchProcess):
×
734
        logging.debug("Failed to get process group ID for chromedriver")
×
735
    return additional_pids
×
736

737

738
def _get_chrome_related_processes(driver: WebDriver) -> list[int]:
1✔
739
    """Chrome関連の全子プロセスを取得
740

741
    undetected_chromedriver 使用時、Chrome プロセスは chromedriver の子ではなく
742
    Python プロセスの直接の子として起動されることがあるため、両方を検索する。
743
    """
744
    chrome_pids = set()
×
745

746
    # 1. driver.service.process の子プロセスを検索
747
    try:
×
748
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "process"):  # type: ignore[attr-defined]
×
749
            process = driver.service.process  # type: ignore[attr-defined]
×
750
            if process and hasattr(process, "pid"):
×
751
                chromedriver_pid = process.pid
×
752

753
                # psutilでプロセス階層を取得
754
                parent_process = psutil.Process(chromedriver_pid)
×
755
                children = parent_process.children(recursive=True)
×
756

757
                for child in children:
×
758
                    chrome_pids.add(child.pid)
×
759
                    logging.debug(
×
760
                        "Found Chrome-related process (service child): PID %d, name: %s",
761
                        child.pid,
762
                        child.name(),
763
                    )
764
    except Exception:
×
765
        logging.exception("Failed to get Chrome-related processes from service")
×
766

767
    # 2. 現在の Python プロセスの全子孫から Chrome 関連プロセスを検索
768
    try:
×
769
        current_process = psutil.Process()
×
770
        all_children = current_process.children(recursive=True)
×
771

772
        for child in all_children:
×
773
            if child.pid in chrome_pids:
×
774
                continue
×
775
            try:
×
776
                if _is_chrome_related_process(child):
×
777
                    chrome_pids.add(child.pid)
×
778
                    logging.debug(
×
779
                        "Found Chrome-related process (python child): PID %d, name: %s",
780
                        child.pid,
781
                        child.name(),
782
                    )
783
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
784
                pass
×
785
    except Exception:
×
786
        logging.exception("Failed to get Chrome-related processes from python children")
×
787

788
    return list(chrome_pids)
×
789

790

791
def _send_signal_to_processes(pids: list[int], sig: signal.Signals, signal_name: str) -> None:
1✔
792
    """プロセスリストに指定されたシグナルを送信"""
793
    errors = []
×
794
    for pid in pids:
×
795
        try:
×
796
            # プロセス名を取得
797
            try:
×
798
                process = psutil.Process(pid)
×
799
                process_name = process.name()
×
800
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
801
                process_name = "unknown"
×
802

803
            if sig == signal.SIGKILL:
×
804
                # プロセスがまだ存在するかチェック
805
                os.kill(pid, 0)  # シグナル0は存在確認
×
806
            os.kill(pid, sig)
×
807
            logging.info("Sent %s to process: PID %d (%s)", signal_name, pid, process_name)
×
808
        except (ProcessLookupError, OSError) as e:
×
809
            # プロセスが既に終了している場合は無視
810
            errors.append((pid, e))
×
811

812
    # エラーが発生した場合はまとめてログ出力
813
    if errors:
×
814
        logging.debug("Failed to send %s to some processes: %s", signal_name, errors)
×
815

816

817
def _terminate_chrome_processes(chrome_pids: list[int], timeout: float = 5.0) -> None:
1✔
818
    """Chrome関連プロセスを段階的に終了
819

820
    Args:
821
        chrome_pids: 終了対象のプロセスIDリスト
822
        timeout: SIGTERM後にプロセス終了を待機する最大時間(秒)
823

824
    """
825
    if not chrome_pids:
×
826
        return
×
827

828
    # 優雅な終了(SIGTERM)
829
    _send_signal_to_processes(chrome_pids, signal.SIGTERM, "SIGTERM")
×
830

831
    # プロセスの終了を待機(ポーリング)
832
    remaining_pids = list(chrome_pids)
×
833
    poll_interval = 0.2
×
834
    elapsed = 0.0
×
835

836
    while remaining_pids and elapsed < timeout:
×
837
        time.sleep(poll_interval)
×
838
        elapsed += poll_interval
×
839

840
        # まだ生存しているプロセスをチェック
841
        still_alive = []
×
842
        for pid in remaining_pids:
×
843
            try:
×
844
                if psutil.pid_exists(pid):
×
845
                    process = psutil.Process(pid)
×
846
                    if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
×
847
                        still_alive.append(pid)
×
848
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
849
                pass
×
850

851
        remaining_pids = still_alive
×
852

853
    # タイムアウト後もまだ残っているプロセスにのみ SIGKILL を送信
854
    if remaining_pids:
×
855
        logging.warning(
×
856
            "Chrome processes still alive after %.1fs, sending SIGKILL to %d processes",
857
            elapsed,
858
            len(remaining_pids),
859
        )
860
        _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
861

862

863
def _reap_single_process(pid: int) -> None:
1✔
864
    """単一プロセスをwaitpidで回収"""
865
    try:
×
866
        # ノンブロッキングでwaitpid
867
        result_pid, status = os.waitpid(pid, os.WNOHANG)
×
868
        if result_pid == pid:
×
869
            # プロセス名を取得
870
            try:
×
871
                process = psutil.Process(pid)
×
872
                process_name = process.name()
×
873
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
874
                process_name = "unknown"
×
875
            logging.debug("Reaped Chrome process: PID %d (%s)", pid, process_name)
×
876
    except (ChildProcessError, OSError):
×
877
        # 子プロセスでない場合や既に回収済みの場合は無視
878
        pass
×
879

880

881
def _reap_chrome_processes(chrome_pids: list[int]) -> None:
1✔
882
    """Chrome関連プロセスを明示的に回収してゾンビ化を防ぐ"""
883
    for pid in chrome_pids:
×
884
        _reap_single_process(pid)
×
885

886

887
def _get_remaining_chrome_pids(chrome_pids: list[int]) -> list[int]:
1✔
888
    """指定されたPIDリストから、まだ生存しているChrome関連プロセスを取得"""
889
    remaining = []
×
890
    for pid in chrome_pids:
×
891
        try:
×
892
            if psutil.pid_exists(pid):
×
893
                process = psutil.Process(pid)
×
894
                if (
×
895
                    process.is_running()
896
                    and process.status() != psutil.STATUS_ZOMBIE
897
                    and _is_chrome_related_process(process)
898
                ):
899
                    remaining.append(pid)
×
900
        except (psutil.NoSuchProcess, psutil.AccessDenied):
×
901
            pass
×
902
    return remaining
×
903

904

905
def _wait_for_processes_with_check(
1✔
906
    chrome_pids: list[int],
907
    timeout: float,
908
    poll_interval: float = 0.2,
909
    log_interval: float = 1.0,
910
) -> list[int]:
911
    """プロセスの終了を待機しつつ、残存プロセスをチェック
912

913
    Args:
914
        chrome_pids: 監視対象のプロセスIDリスト
915
        timeout: 最大待機時間(秒)
916
        poll_interval: チェック間隔(秒)
917
        log_interval: ログ出力間隔(秒)
918

919
    Returns:
920
        タイムアウト後も残存しているプロセスIDのリスト
921

922
    """
923
    elapsed = 0.0
×
924
    last_log_time = 0.0
×
925
    remaining_pids = list(chrome_pids)
×
926

927
    while remaining_pids and elapsed < timeout:
×
928
        time.sleep(poll_interval)
×
929
        elapsed += poll_interval
×
930
        remaining_pids = _get_remaining_chrome_pids(remaining_pids)
×
931

932
        if remaining_pids and (elapsed - last_log_time) >= log_interval:
×
933
            logging.info(
×
934
                "Found %d remaining Chrome processes after %.0fs",
935
                len(remaining_pids),
936
                elapsed,
937
            )
938
            last_log_time = elapsed
×
939

940
    return remaining_pids
×
941

942

943
def quit_driver_gracefully(  # noqa: C901
1✔
944
    driver: WebDriver | None,
945
    wait_sec: float = 5.0,
946
    sigterm_wait_sec: float = 5.0,
947
    sigkill_wait_sec: float = 5.0,
948
) -> None:
949
    """Chrome WebDriverを確実に終了する
950

951
    終了フロー:
952
    1. driver.quit() を呼び出し
953
    2. wait_sec 秒待機しつつプロセス終了をチェック
954
    3. 残存プロセスがあれば SIGTERM を送信
955
    4. sigterm_wait_sec 秒待機しつつプロセス終了をチェック
956
    5. 残存プロセスがあれば SIGKILL を送信
957
    6. sigkill_wait_sec 秒待機
958

959
    Args:
960
        driver: 終了する WebDriver インスタンス
961
        wait_sec: quit 後にプロセス終了を待機する秒数(デフォルト: 5秒)
962
        sigterm_wait_sec: SIGTERM 後にプロセス終了を待機する秒数(デフォルト: 5秒)
963
        sigkill_wait_sec: SIGKILL 後にプロセス回収を待機する秒数(デフォルト: 5秒)
964

965
    """
966
    if driver is None:
1✔
967
        return
1✔
968

969
    # quit前にChrome関連プロセスを記録
970
    chrome_pids_before = _get_chrome_related_processes(driver)
1✔
971

972
    try:
1✔
973
        # WebDriverの正常終了を試行(これがタブのクローズも含む)
974
        driver.quit()
1✔
975
        logging.info("WebDriver quit successfully")
1✔
976
    except Exception:
1✔
977
        logging.warning("Failed to quit driver normally", exc_info=True)
1✔
978
    finally:
979
        # undetected_chromedriver の __del__ がシャットダウン時に再度呼ばれるのを防ぐ
980
        if hasattr(driver, "_has_quit"):
1✔
981
            driver._has_quit = True  # type: ignore[attr-defined]  # noqa: SLF001
1✔
982

983
    # ChromeDriverサービスの停止を試行
984
    try:
1✔
985
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "stop"):  # type: ignore[attr-defined]
1✔
986
            driver.service.stop()  # type: ignore[attr-defined]
×
987
    except (ConnectionResetError, OSError):
×
988
        # Chrome が既に終了している場合は無視
989
        logging.debug("Chrome service already stopped")
×
990
    except Exception:
×
991
        logging.warning("Failed to stop Chrome service", exc_info=True)
×
992

993
    # Step 1: quit 後に wait_sec 秒待機しつつプロセス終了をチェック
994
    remaining_pids = _wait_for_processes_with_check(chrome_pids_before, wait_sec)
1✔
995

996
    if not remaining_pids:
1✔
997
        logging.debug("All Chrome processes exited normally")
1✔
998
        return
1✔
999

1000
    # Step 2: 残存プロセスに SIGTERM を送信
1001
    logging.info(
×
1002
        "Found %d remaining Chrome processes after %.0fs, sending SIGTERM",
1003
        len(remaining_pids),
1004
        wait_sec,
1005
    )
1006
    _send_signal_to_processes(remaining_pids, signal.SIGTERM, "SIGTERM")
×
1007

1008
    # Step 3: SIGTERM 後に sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1009
    remaining_pids = _wait_for_processes_with_check(remaining_pids, sigterm_wait_sec)
×
1010

1011
    if not remaining_pids:
×
1012
        logging.info("All Chrome processes exited after SIGTERM")
×
1013
        _reap_chrome_processes(chrome_pids_before)
×
1014
        return
×
1015

1016
    # Step 4: 残存プロセスに SIGKILL を送信
1017
    logging.warning(
×
1018
        "Chrome processes still alive after SIGTERM + %.1fs, sending SIGKILL to %d processes",
1019
        sigterm_wait_sec,
1020
        len(remaining_pids),
1021
    )
1022
    _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
1023

1024
    # Step 5: SIGKILL 後に sigkill_wait_sec 秒待機してプロセス回収
1025
    time.sleep(sigkill_wait_sec)
×
1026
    _reap_chrome_processes(chrome_pids_before)
×
1027

1028
    # 最終チェック:まだ残っているプロセスがあるか確認
1029
    still_remaining = _get_remaining_chrome_pids(remaining_pids)
×
1030

1031
    # 回収できなかったプロセスについて警告
1032
    if still_remaining:
×
1033
        for pid in still_remaining:
×
1034
            try:
×
1035
                process = psutil.Process(pid)
×
1036
                logging.warning("Failed to collect Chrome-related process: PID %d (%s)", pid, process.name())
×
1037
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1038
                pass
×
1039

1040

1041
if __name__ == "__main__":
1042
    import pathlib
1043

1044
    import docopt
1045
    import selenium.webdriver.support.wait
1046

1047
    import my_lib.config
1048
    import my_lib.logger
1049

1050
    assert __doc__ is not None  # noqa: S101
1051
    args = docopt.docopt(__doc__)
1052

1053
    config_file = args["-c"]
1054
    debug_mode = args["-D"]
1055

1056
    my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
1057

1058
    config = my_lib.config.load(config_file)
1059

1060
    driver = create_driver("test", pathlib.Path(config["data"]["selenium"]))
1061
    wait = selenium.webdriver.support.wait.WebDriverWait(driver, 5)
1062

1063
    driver.get("https://www.google.com/")
1064
    wait.until(
1065
        selenium.webdriver.support.expected_conditions.presence_of_element_located(
1066
            (selenium.webdriver.common.by.By.XPATH, '//input[contains(@value, "Google")]')
1067
        )
1068
    )
1069

1070
    quit_driver_gracefully(driver)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc