• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kimata / my-py-lib / 20613793761

31 Dec 2025 06:44AM UTC coverage: 65.509% (-0.1%) from 65.61%
20613793761

push

github

kimata
fix: browser_tab テストの無限ループを修正

_cleanup メソッドの while ループで window_handles が
固定値のままだったため無限ループになっていた問題を修正。

PropertyMock で window_handles を動的に返し、close() 時に
リストを更新するように変更。

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

2923 of 4462 relevant lines covered (65.51%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.83
/src/my_lib/selenium_util.py
1
#!/usr/bin/env python3
2
"""
3
Selenium を Chrome Driver を使って動かします。
4

5
Usage:
6
  selenium_util.py [-c CONFIG] [-D]
7

8
Options:
9
  -c CONFIG         : CONFIG を設定ファイルとして読み込んで実行します。[default: tests/data/config.example.yaml]
10
  -D                : デバッグモードで動作します。
11
"""
12

13
from __future__ import annotations
1✔
14

15
import datetime
1✔
16
import inspect
1✔
17
import json
1✔
18
import logging
1✔
19
import os
1✔
20
import pathlib
1✔
21
import random
1✔
22
import re
1✔
23
import shutil
1✔
24
import signal
1✔
25
import sqlite3
1✔
26
import subprocess
1✔
27
import time
1✔
28
import io
1✔
29
from collections.abc import Callable
1✔
30
from dataclasses import dataclass
1✔
31
from typing import TYPE_CHECKING, Any
1✔
32

33
import PIL.Image
1✔
34
import psutil
1✔
35
import selenium
1✔
36
import selenium.common.exceptions
1✔
37
import selenium.webdriver.chrome.options
1✔
38
import selenium.webdriver.chrome.service
1✔
39
import selenium.webdriver.common.action_chains
1✔
40
import selenium.webdriver.common.by
1✔
41
import selenium.webdriver.common.keys
1✔
42
import selenium.webdriver.support.expected_conditions
1✔
43
import undetected_chromedriver
1✔
44

45
if TYPE_CHECKING:
46
    from selenium.webdriver.remote.webdriver import WebDriver
47
    from selenium.webdriver.support.wait import WebDriverWait
48

49
WAIT_RETRY_COUNT: int = 1
1✔
50

51

52
class SeleniumError(Exception):
1✔
53
    """Selenium 関連エラーの基底クラス"""
54

55

56
def _get_chrome_version() -> int | None:
1✔
57
    try:
1✔
58
        result = subprocess.run(
1✔
59
            ["google-chrome", "--version"],
60
            capture_output=True,
61
            text=True,
62
            timeout=10,
63
        )
64
        match = re.search(r"(\d+)\.", result.stdout)
1✔
65
        if match:
1✔
66
            return int(match.group(1))
1✔
67
    except Exception:
1✔
68
        logging.warning("Failed to detect Chrome version")
1✔
69
    return None
1✔
70

71

72
def _create_driver_impl(
1✔
73
    profile_name: str,
74
    data_path: pathlib.Path,
75
    is_headless: bool,
76
    use_subprocess: bool = True,
77
) -> WebDriver:  # noqa: ARG001
78
    chrome_data_path = data_path / "chrome"
×
79
    log_path = data_path / "log"
×
80

81
    # NOTE: Pytest を並列実行できるようにする
82
    suffix = os.environ.get("PYTEST_XDIST_WORKER", None)
×
83
    if suffix is not None:
×
84
        profile_name += "." + suffix
×
85

86
    chrome_data_path.mkdir(parents=True, exist_ok=True)
×
87
    log_path.mkdir(parents=True, exist_ok=True)
×
88

89
    options = selenium.webdriver.chrome.options.Options()
×
90

91
    if is_headless:
×
92
        options.add_argument("--headless=new")
×
93

94
    options.add_argument("--no-sandbox")  # for Docker
×
95
    options.add_argument("--disable-dev-shm-usage")  # for Docker
×
96
    options.add_argument("--disable-gpu")
×
97

98
    options.add_argument("--disable-popup-blocking")
×
99
    options.add_argument("--disable-plugins")
×
100

101
    options.add_argument("--no-first-run")
×
102

103
    options.add_argument("--lang=ja-JP")
×
104
    options.add_argument("--window-size=1920,1080")
×
105

106
    # NOTE: Accept-Language ヘッダーを日本語優先に設定
107
    options.add_experimental_option("prefs", {"intl.accept_languages": "ja-JP,ja,en-US,en"})
×
108

109
    options.add_argument("--user-data-dir=" + str(chrome_data_path / profile_name))
×
110

111
    options.add_argument("--enable-logging")
×
112
    options.add_argument("--v=1")
×
113

114
    chrome_log_file = log_path / f"chrome_{profile_name}.log"
×
115
    options.add_argument(f"--log-file={chrome_log_file!s}")
×
116

117
    if not is_headless:
×
118
        options.add_argument("--auto-open-devtools-for-tabs")
×
119

120
    service = selenium.webdriver.chrome.service.Service(
×
121
        service_args=["--verbose", f"--log-path={str(log_path / 'webdriver.log')!s}"],
122
    )
123

124
    chrome_version = _get_chrome_version()
×
125

126
    # NOTE: user_multi_procs=True は既存の chromedriver ファイルが存在することを前提としているため、
127
    # ファイルが存在しない場合(CI環境の初回実行など)は False にする
128
    uc_data_path = pathlib.Path("~/.local/share/undetected_chromedriver").expanduser()
×
129
    use_multi_procs = uc_data_path.exists() and any(uc_data_path.glob("*chromedriver*"))
×
130

131
    driver = undetected_chromedriver.Chrome(
×
132
        service=service,
133
        options=options,
134
        use_subprocess=use_subprocess,
135
        version_main=chrome_version,
136
        user_multi_procs=use_multi_procs,
137
    )
138

139
    driver.set_page_load_timeout(30)
×
140

141
    return driver
×
142

143

144
@dataclass
1✔
145
class _ProfileHealthResult:
1✔
146
    """プロファイル健全性チェックの結果"""
147

148
    is_healthy: bool
1✔
149
    errors: list[str]
1✔
150
    has_lock_files: bool = False
1✔
151
    has_corrupted_json: bool = False
1✔
152
    has_corrupted_db: bool = False
1✔
153

154

155
def _check_json_file(file_path: pathlib.Path) -> str | None:
1✔
156
    """JSON ファイルの整合性をチェック
157

158
    Returns:
159
        エラーメッセージ(正常な場合は None)
160
    """
161
    if not file_path.exists():
1✔
162
        return None
1✔
163

164
    try:
1✔
165
        content = file_path.read_text(encoding="utf-8")
1✔
166
        json.loads(content)
1✔
167
        return None
1✔
168
    except json.JSONDecodeError as e:
1✔
169
        return f"{file_path.name} is corrupted: {e}"
1✔
170
    except Exception as e:
×
171
        return f"{file_path.name} read error: {e}"
×
172

173

174
def _check_sqlite_db(db_path: pathlib.Path) -> str | None:
1✔
175
    """SQLite データベースの整合性をチェック
176

177
    Returns:
178
        エラーメッセージ(正常な場合は None)
179
    """
180
    if not db_path.exists():
1✔
181
        return None
1✔
182

183
    try:
1✔
184
        conn = sqlite3.connect(str(db_path), timeout=5)
1✔
185
        result = conn.execute("PRAGMA integrity_check").fetchone()
1✔
186
        conn.close()
1✔
187
        if result[0] != "ok":
1✔
188
            return f"{db_path.name} database is corrupted: {result[0]}"
×
189
        return None
1✔
190
    except sqlite3.DatabaseError as e:
1✔
191
        return f"{db_path.name} database error: {e}"
1✔
192
    except Exception as e:
×
193
        return f"{db_path.name} check error: {e}"
×
194

195

196
def _check_profile_health(profile_path: pathlib.Path) -> _ProfileHealthResult:
1✔
197
    """Chrome プロファイルの健全性をチェック
198

199
    Args:
200
        profile_path: Chrome プロファイルのディレクトリパス
201

202
    Returns:
203
        ProfileHealthResult: チェック結果
204
    """
205
    errors: list[str] = []
1✔
206
    has_lock_files = False
1✔
207
    has_corrupted_json = False
1✔
208
    has_corrupted_db = False
1✔
209

210
    if not profile_path.exists():
1✔
211
        # プロファイルが存在しない場合は健全(新規作成される)
212
        return _ProfileHealthResult(is_healthy=True, errors=[])
1✔
213

214
    default_path = profile_path / "Default"
1✔
215

216
    # 1. ロックファイルのチェック
217
    lock_files = ["SingletonLock", "SingletonSocket", "SingletonCookie"]
1✔
218
    existing_locks = []
1✔
219
    for lock_file in lock_files:
1✔
220
        lock_path = profile_path / lock_file
1✔
221
        if lock_path.exists() or lock_path.is_symlink():
1✔
222
            existing_locks.append(lock_file)
1✔
223
            has_lock_files = True
1✔
224
    if existing_locks:
1✔
225
        errors.append(f"Lock files exist: {', '.join(existing_locks)}")
1✔
226

227
    # 2. Local State の JSON チェック
228
    local_state_error = _check_json_file(profile_path / "Local State")
1✔
229
    if local_state_error:
1✔
230
        errors.append(local_state_error)
1✔
231
        has_corrupted_json = True
1✔
232

233
    # 3. Preferences の JSON チェック
234
    if default_path.exists():
1✔
235
        prefs_error = _check_json_file(default_path / "Preferences")
1✔
236
        if prefs_error:
1✔
237
            errors.append(prefs_error)
×
238
            has_corrupted_json = True
×
239

240
        # 4. SQLite データベースの整合性チェック
241
        for db_name in ["Cookies", "History", "Web Data"]:
1✔
242
            db_error = _check_sqlite_db(default_path / db_name)
1✔
243
            if db_error:
1✔
244
                errors.append(db_error)
1✔
245
                has_corrupted_db = True
1✔
246

247
    is_healthy = len(errors) == 0
1✔
248

249
    return _ProfileHealthResult(
1✔
250
        is_healthy=is_healthy,
251
        errors=errors,
252
        has_lock_files=has_lock_files,
253
        has_corrupted_json=has_corrupted_json,
254
        has_corrupted_db=has_corrupted_db,
255
    )
256

257

258
def _recover_corrupted_profile(profile_path: pathlib.Path) -> bool:
1✔
259
    """破損したプロファイルをバックアップして新規作成を可能にする
260

261
    Args:
262
        profile_path: Chrome プロファイルのディレクトリパス
263

264
    Returns:
265
        bool: リカバリが成功したかどうか
266
    """
267
    if not profile_path.exists():
1✔
268
        return True
1✔
269

270
    # バックアップ先を決定(タイムスタンプ付き)
271
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
1✔
272
    backup_path = profile_path.parent / f"{profile_path.name}.corrupted.{timestamp}"
1✔
273

274
    try:
1✔
275
        shutil.move(str(profile_path), str(backup_path))
1✔
276
        logging.warning(
1✔
277
            "Corrupted profile moved to backup: %s -> %s",
278
            profile_path,
279
            backup_path,
280
        )
281
        return True
1✔
282
    except Exception as e:
×
283
        logging.exception("Failed to backup corrupted profile: %s", e)
×
284
        return False
×
285

286

287
def _cleanup_profile_lock(profile_path: pathlib.Path) -> None:
1✔
288
    """プロファイルのロックファイルを削除する"""
289
    lock_files = ["SingletonLock", "SingletonSocket", "SingletonCookie"]
1✔
290
    found_locks = []
1✔
291
    for lock_file in lock_files:
1✔
292
        lock_path = profile_path / lock_file
1✔
293
        if lock_path.exists() or lock_path.is_symlink():
1✔
294
            found_locks.append(lock_path)
1✔
295

296
    if found_locks:
1✔
297
        logging.warning("Profile lock files found: %s", ", ".join(str(p.name) for p in found_locks))
1✔
298
        for lock_path in found_locks:
1✔
299
            try:
1✔
300
                lock_path.unlink()
1✔
301
            except OSError as e:
×
302
                logging.warning("Failed to remove lock file %s: %s", lock_path, e)
×
303

304

305
def _is_running_in_container() -> bool:
1✔
306
    """コンテナ内で実行中かどうかを判定"""
307
    return os.path.exists("/.dockerenv")
1✔
308

309

310
def _cleanup_orphaned_chrome_processes_in_container() -> None:
1✔
311
    """コンテナ内で実行中の場合のみ、残った Chrome プロセスをクリーンアップ
312

313
    NOTE: プロセスツリーに関係なくプロセス名で一律終了するのはコンテナ内限定
314
    """
315
    if not _is_running_in_container():
×
316
        return
×
317

318
    for proc in psutil.process_iter(["pid", "name"]):
×
319
        try:
×
320
            proc_name = proc.info["name"].lower() if proc.info["name"] else ""
×
321
            if "chrome" in proc_name:
×
322
                logging.info("Terminating orphaned Chrome process: PID %d", proc.info["pid"])
×
323
                os.kill(proc.info["pid"], signal.SIGTERM)
×
324
        except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError, OSError):
×
325
            pass
×
326
    time.sleep(1)
×
327

328

329
def _get_actual_profile_name(profile_name: str) -> str:
1✔
330
    """PYTEST_XDIST_WORKER を考慮した実際のプロファイル名を取得"""
331
    suffix = os.environ.get("PYTEST_XDIST_WORKER", None)
×
332
    return profile_name + ("." + suffix if suffix is not None else "")
×
333

334

335
def delete_profile(profile_name: str, data_path: pathlib.Path) -> bool:
1✔
336
    """Chrome プロファイルを削除する
337

338
    Args:
339
        profile_name: プロファイル名
340
        data_path: データディレクトリのパス
341

342
    Returns:
343
        bool: 削除が成功したかどうか
344
    """
345
    actual_profile_name = _get_actual_profile_name(profile_name)
×
346
    profile_path = data_path / "chrome" / actual_profile_name
×
347

348
    if not profile_path.exists():
×
349
        logging.info("Profile does not exist: %s", profile_path)
×
350
        return True
×
351

352
    try:
×
353
        shutil.rmtree(profile_path)
×
354
        logging.warning("Deleted Chrome profile: %s", profile_path)
×
355
        return True
×
356
    except Exception:
×
357
        logging.exception("Failed to delete Chrome profile: %s", profile_path)
×
358
        return False
×
359

360

361
def create_driver(
1✔
362
    profile_name: str,
363
    data_path: pathlib.Path,
364
    is_headless: bool = True,
365
    clean_profile: bool = False,
366
    auto_recover: bool = True,
367
    use_subprocess: bool = True,
368
) -> WebDriver:
369
    """Chrome WebDriver を作成する
370

371
    Args:
372
        profile_name: プロファイル名
373
        data_path: データディレクトリのパス
374
        is_headless: ヘッドレスモードで起動するか
375
        clean_profile: 起動前にロックファイルを削除するか
376
        auto_recover: プロファイル破損時に自動リカバリするか
377
        use_subprocess: サブプロセスで Chrome を起動するか
378
    """
379
    # NOTE: ルートロガーの出力レベルを変更した場合でも Selenium 関係は抑制する
380
    logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
×
381
    logging.getLogger("selenium.webdriver.common.selenium_manager").setLevel(logging.WARNING)
×
382
    logging.getLogger("selenium.webdriver.remote.remote_connection").setLevel(logging.WARNING)
×
383

384
    actual_profile_name = _get_actual_profile_name(profile_name)
×
385
    profile_path = data_path / "chrome" / actual_profile_name
×
386

387
    # プロファイル健全性チェック
388
    health = _check_profile_health(profile_path)
×
389
    if not health.is_healthy:
×
390
        logging.warning("Profile health check failed: %s", ", ".join(health.errors))
×
391

392
        if health.has_lock_files and not (health.has_corrupted_json or health.has_corrupted_db):
×
393
            # ロックファイルのみの問題なら削除して続行
394
            logging.info("Cleaning up lock files only")
×
395
            _cleanup_profile_lock(profile_path)
×
396
        elif auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
397
            # JSON または DB が破損している場合はプロファイルをリカバリ
398
            logging.warning("Profile is corrupted, attempting recovery")
×
399
            if _recover_corrupted_profile(profile_path):
×
400
                logging.info("Profile recovery successful, will create new profile")
×
401
            else:
402
                logging.error("Profile recovery failed")
×
403

404
    if clean_profile:
×
405
        _cleanup_profile_lock(profile_path)
×
406

407
    # NOTE: 1回だけ自動リトライ
408
    try:
×
409
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess)
×
410
    except Exception as e:
×
411
        logging.warning("First attempt to create driver failed: %s", e)
×
412

413
        # コンテナ内で実行中の場合のみ、残った Chrome プロセスをクリーンアップ
414
        _cleanup_orphaned_chrome_processes_in_container()
×
415

416
        # プロファイルのロックファイルを削除
417
        _cleanup_profile_lock(profile_path)
×
418

419
        # 再度健全性チェック
420
        health = _check_profile_health(profile_path)
×
421
        if not health.is_healthy and auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
422
            logging.warning("Profile still corrupted after first attempt, recovering")
×
423
            _recover_corrupted_profile(profile_path)
×
424

425
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess)
×
426

427

428
def xpath_exists(driver: WebDriver, xpath: str) -> bool:
1✔
429
    return len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0
×
430

431

432
def get_text(
1✔
433
    driver: WebDriver,
434
    xpath: str,
435
    safe_text: str,
436
    wait: WebDriverWait[WebDriver] | None = None,
437
) -> str:
438
    if wait is not None:
×
439
        wait.until(
×
440
            selenium.webdriver.support.expected_conditions.presence_of_all_elements_located(
441
                (selenium.webdriver.common.by.By.XPATH, xpath)
442
            )
443
        )
444

445
    if len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0:
×
446
        return driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).text.strip()
×
447
    else:
448
        return safe_text
×
449

450

451
def input_xpath(
1✔
452
    driver: WebDriver,
453
    xpath: str,
454
    text: str,
455
    wait: WebDriverWait[WebDriver] | None = None,
456
    is_warn: bool = True,
457
) -> bool:
458
    if wait is not None:
×
459
        wait.until(
×
460
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
461
                (selenium.webdriver.common.by.By.XPATH, xpath)
462
            )
463
        )
464
        time.sleep(0.05)
×
465

466
    if xpath_exists(driver, xpath):
×
467
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).send_keys(text)
×
468
        return True
×
469
    else:
470
        if is_warn:
×
471
            logging.warning("Element is not found: %s", xpath)
×
472
        return False
×
473

474

475
def click_xpath(
1✔
476
    driver: WebDriver,
477
    xpath: str,
478
    wait: WebDriverWait[WebDriver] | None = None,
479
    is_warn: bool = True,
480
    move: bool = False,
481
) -> bool:
482
    if wait is not None:
×
483
        wait.until(
×
484
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
485
                (selenium.webdriver.common.by.By.XPATH, xpath)
486
            )
487
        )
488
        time.sleep(0.05)
×
489

490
    if xpath_exists(driver, xpath):
×
491
        elem = driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath)
×
492
        if move:
×
493
            action = selenium.webdriver.common.action_chains.ActionChains(driver)
×
494
            action.move_to_element(elem)
×
495
            action.perform()
×
496

497
        elem.click()
×
498
        return True
×
499
    else:
500
        if is_warn:
×
501
            logging.warning("Element is not found: %s", xpath)
×
502
        return False
×
503

504

505
def is_display(driver: WebDriver, xpath: str) -> bool:
1✔
506
    return (len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0) and (
×
507
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).is_displayed()
508
    )
509

510

511
def random_sleep(sec: float) -> None:
1✔
512
    RATIO = 0.8
1✔
513

514
    time.sleep((sec * RATIO) + (sec * (1 - RATIO) * 2) * random.random())  # noqa: S311
1✔
515

516

517
def wait_patiently(
1✔
518
    driver: WebDriver,
519
    wait: WebDriverWait[WebDriver],
520
    target: Any,
521
) -> None:
522
    error: selenium.common.exceptions.TimeoutException | None = None
×
523
    for i in range(WAIT_RETRY_COUNT + 1):
×
524
        try:
×
525
            wait.until(target)
×
526
            return
×
527
        except selenium.common.exceptions.TimeoutException as e:  # noqa: PERF203
×
528
            logging.warning(
×
529
                "タイムアウトが発生しました。(%s in %s line %d)",
530
                inspect.stack()[1].function,
531
                inspect.stack()[1].filename,
532
                inspect.stack()[1].lineno,
533
            )
534
            error = e
×
535

536
            logging.info(i)
×
537
            if i != WAIT_RETRY_COUNT:
×
538
                logging.info("refresh")
×
539
                driver.refresh()
×
540

541
    if error is not None:
×
542
        raise error
×
543

544

545
def dump_page(
1✔
546
    driver: WebDriver,
547
    index: int,
548
    dump_path: pathlib.Path,
549
    stack_index: int = 1,
550
) -> None:
551
    name = inspect.stack()[stack_index].function.replace("<", "").replace(">", "")
×
552

553
    dump_path.mkdir(parents=True, exist_ok=True)
×
554

555
    png_path = dump_path / f"{name}_{index:02d}.png"
×
556
    htm_path = dump_path / f"{name}_{index:02d}.htm"
×
557

558
    driver.save_screenshot(str(png_path))
×
559

560
    with htm_path.open("w", encoding="utf-8") as f:
×
561
        f.write(driver.page_source)
×
562

563
    logging.info(
×
564
        "page dump: %02d from %s in %s line %d",
565
        index,
566
        inspect.stack()[stack_index].function,
567
        inspect.stack()[stack_index].filename,
568
        inspect.stack()[stack_index].lineno,
569
    )
570

571

572
def clear_cache(driver: WebDriver) -> None:
1✔
573
    driver.execute_cdp_cmd("Network.clearBrowserCache", {})
×
574

575

576
def clean_dump(dump_path: pathlib.Path, keep_days: int = 1) -> None:
1✔
577
    if not dump_path.exists():
1✔
578
        return
1✔
579

580
    time_threshold = datetime.timedelta(keep_days)
1✔
581

582
    for item in dump_path.iterdir():
1✔
583
        if not item.is_file():
1✔
584
            continue
1✔
585
        try:
1✔
586
            time_diff = datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp(
1✔
587
                item.stat().st_mtime, datetime.timezone.utc
588
            )
589
        except FileNotFoundError:
×
590
            # ファイルが別プロセスにより削除された場合(SQLiteの一時ファイルなど)
591
            continue
×
592
        if time_diff > time_threshold:
1✔
593
            logging.warning("remove %s [%s day(s) old].", item.absolute(), f"{time_diff.days:,}")
1✔
594

595
            item.unlink(missing_ok=True)
1✔
596

597

598
def get_memory_info(driver: WebDriver) -> dict[str, Any]:
1✔
599
    """ブラウザのメモリ使用量を取得(単位: KB)"""
600
    total_bytes = subprocess.Popen(  # noqa: S602
×
601
        "smem -t -c pss -P chrome | tail -n 1",  # noqa: S607
602
        shell=True,
603
        stdout=subprocess.PIPE,
604
    ).communicate()[0]
605
    total = int(str(total_bytes, "utf-8").strip())  # smem の出力は KB 単位
×
606

607
    try:
×
608
        memory_info = driver.execute_cdp_cmd("Memory.getAllTimeSamplingProfile", {})
×
609
        heap_usage = driver.execute_cdp_cmd("Runtime.getHeapUsage", {})
×
610

611
        heap_used = heap_usage.get("usedSize", 0) // 1024  # bytes → KB
×
612
        heap_total = heap_usage.get("totalSize", 0) // 1024  # bytes → KB
×
613
    except Exception as e:
×
614
        logging.debug("Failed to get memory usage: %s", e)
×
615

616
        memory_info = None
×
617
        heap_used = 0
×
618
        heap_total = 0
×
619

620
    return {
×
621
        "total": total,
622
        "heap_used": heap_used,
623
        "heap_total": heap_total,
624
        "memory_info": memory_info,
625
    }
626

627

628
def log_memory_usage(driver: WebDriver) -> None:
1✔
629
    mem_info = get_memory_info(driver)
×
630
    logging.info(
×
631
        "Chrome memory: %s MB (JS heap: %s MB)",
632
        f"""{mem_info["total"] // 1024:,}""",
633
        f"""{mem_info["heap_used"] // 1024:,}""",
634
    )
635

636

637
def _warmup(
1✔
638
    driver: WebDriver,
639
    keyword: str,
640
    url_pattern: str,
641
    sleep_sec: int = 3,
642
) -> None:
643
    # NOTE: ダミーアクセスを行って BOT ではないと思わせる。(効果なさそう...)
644
    driver.get("https://www.yahoo.co.jp/")
×
645
    time.sleep(sleep_sec)
×
646

647
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(keyword)
×
648
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(
×
649
        selenium.webdriver.common.keys.Keys.ENTER
650
    )
651

652
    time.sleep(sleep_sec)
×
653

654
    driver.find_element(
×
655
        selenium.webdriver.common.by.By.XPATH, f'//a[contains(@href, "{url_pattern}")]'
656
    ).click()
657

658
    time.sleep(sleep_sec)
×
659

660

661
class browser_tab:  # noqa: N801
1✔
662
    def __init__(self, driver: WebDriver, url: str) -> None:  # noqa: D107
1✔
663
        self.driver = driver
1✔
664
        self.url = url
1✔
665
        self.original_window: str | None = None
1✔
666

667
    def __enter__(self) -> None:  # noqa: D105
1✔
668
        self.original_window = self.driver.current_window_handle
1✔
669
        self.driver.execute_script("window.open('');")
1✔
670
        self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
671
        try:
1✔
672
            self.driver.get(self.url)
1✔
673
        except Exception:
×
674
            # NOTE: URL読み込みに失敗した場合もクリーンアップしてから例外を再送出
675
            self._cleanup()
×
676
            raise
×
677

678
    def _cleanup(self) -> None:
1✔
679
        """タブを閉じて元のウィンドウに戻る"""
680
        try:
1✔
681
            # 余分なタブを閉じる
682
            while len(self.driver.window_handles) > 1:
1✔
683
                self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
684
                self.driver.close()
1✔
685
            if self.original_window is not None:
1✔
686
                self.driver.switch_to.window(self.original_window)
1✔
687
            time.sleep(0.1)
1✔
688
        except Exception:
1✔
689
            # NOTE: Chromeがクラッシュした場合は無視(既に終了しているため操作不可)
690
            logging.exception("タブのクリーンアップに失敗しました(Chromeがクラッシュした可能性があります)")
1✔
691

692
    def _recover_from_error(self) -> None:
1✔
693
        """エラー後にブラウザの状態を回復する"""
694
        try:
×
695
            # ページロードタイムアウトをリセット(負の値になっている可能性があるため)
696
            self.driver.set_page_load_timeout(30)
×
697

698
            # about:blank に移動してレンダラーの状態をリセット
699
            self.driver.get("about:blank")
×
700
            time.sleep(0.5)
×
701
        except Exception:
×
702
            logging.warning("ブラウザの回復に失敗しました")
×
703

704
    def __exit__(
1✔
705
        self,
706
        exception_type: type[BaseException] | None,
707
        exception_value: BaseException | None,
708
        traceback: Any,
709
    ) -> None:  # noqa: D105
710
        self._cleanup()
1✔
711

712
        # 例外が発生した場合はブラウザの状態を回復
713
        if exception_type is not None:
1✔
714
            self._recover_from_error()
×
715

716

717
class error_handler:  # noqa: N801
1✔
718
    """Selenium操作時のエラーハンドリング用コンテキストマネージャ
719

720
    エラー発生時に自動でログ出力、スクリーンショット取得、コールバック呼び出しを行う。
721

722
    Args:
723
        driver: WebDriver インスタンス
724
        message: ログに出力するエラーメッセージ
725
        on_error: エラー時に呼ばれるコールバック関数 (exception, screenshot: PIL.Image.Image | None) -> None
726
        capture_screenshot: スクリーンショットを自動取得するか(デフォルト: True)
727
        reraise: 例外を再送出するか(デフォルト: True)
728

729
    Attributes:
730
        exception: 発生した例外(エラーがなければ None)
731
        screenshot: 取得したスクリーンショット(PIL.Image.Image、取得失敗時は None)
732

733
    Examples:
734
        基本的な使用方法::
735

736
            with my_lib.selenium_util.error_handler(driver, message="ログイン処理に失敗") as handler:
737
                driver.get(login_url)
738
                driver.find_element(...).click()
739

740
        コールバック付き(Slack通知など)::
741

742
            def notify(exc, screenshot):
743
                slack.error("エラー発生", str(exc), screenshot)
744

745
            with my_lib.selenium_util.error_handler(
746
                driver,
747
                message="クロール処理に失敗",
748
                on_error=notify,
749
            ):
750
                crawl_page(driver)
751

752
        例外を抑制して続行::
753

754
            with my_lib.selenium_util.error_handler(driver, reraise=False) as handler:
755
                risky_operation()
756

757
            if handler.exception:
758
                logging.warning("処理をスキップしました")
759
    """
760

761
    def __init__(
1✔
762
        self,
763
        driver: WebDriver,
764
        message: str = "Selenium operation failed",
765
        on_error: Callable[[Exception, PIL.Image.Image | None], None] | None = None,
766
        capture_screenshot: bool = True,
767
        reraise: bool = True,
768
    ) -> None:
769
        self.driver = driver
1✔
770
        self.message = message
1✔
771
        self.on_error = on_error
1✔
772
        self.capture_screenshot = capture_screenshot
1✔
773
        self.reraise = reraise
1✔
774
        self.exception: Exception | None = None
1✔
775
        self.screenshot: PIL.Image.Image | None = None
1✔
776

777
    def __enter__(self) -> "error_handler":
1✔
778
        return self
1✔
779

780
    def __exit__(
1✔
781
        self,
782
        exception_type: type[BaseException] | None,
783
        exception_value: BaseException | None,
784
        traceback: Any,
785
    ) -> bool:
786
        if exception_value is None:
1✔
787
            return False
1✔
788

789
        # 例外を記録
790
        if isinstance(exception_value, Exception):
1✔
791
            self.exception = exception_value
1✔
792
        else:
793
            # BaseException(KeyboardInterrupt など)は処理せず再送出
794
            return False
×
795

796
        # ログ出力
797
        logging.exception(self.message)
1✔
798

799
        # スクリーンショット取得
800
        if self.capture_screenshot:
1✔
801
            try:
1✔
802
                screenshot_bytes = self.driver.get_screenshot_as_png()
1✔
803
                self.screenshot = PIL.Image.open(io.BytesIO(screenshot_bytes))
1✔
804
            except Exception:
1✔
805
                logging.debug("Failed to capture screenshot for error handling")
1✔
806

807
        # コールバック呼び出し
808
        if self.on_error is not None:
1✔
809
            try:
1✔
810
                self.on_error(self.exception, self.screenshot)
1✔
811
            except Exception:
×
812
                logging.exception("Error in on_error callback")
×
813

814
        # reraise=False なら例外を抑制
815
        return not self.reraise
1✔
816

817

818
def _is_chrome_related_process(process: psutil.Process) -> bool:
1✔
819
    """プロセスがChrome関連かどうかを判定"""
820
    try:
1✔
821
        process_name = process.name().lower()
1✔
822
        # Chrome関連のプロセス名パターン
823
        chrome_patterns = ["chrome", "chromium", "google-chrome", "undetected_chro"]
1✔
824
        # chromedriverは除外
825
        if "chromedriver" in process_name:
1✔
826
            return False
1✔
827
        return any(pattern in process_name for pattern in chrome_patterns)
1✔
828
    except (psutil.NoSuchProcess, psutil.AccessDenied):
1✔
829
        return False
1✔
830

831

832
def _get_chrome_processes_by_pgid(chromedriver_pid: int, existing_pids: set[int]) -> list[int]:
1✔
833
    """プロセスグループIDで追加のChrome関連プロセスを取得"""
834
    additional_pids = []
×
835
    try:
×
836
        pgid = os.getpgid(chromedriver_pid)
×
837
        for proc in psutil.process_iter(["pid", "name", "ppid"]):
×
838
            if proc.info["pid"] in existing_pids:
×
839
                continue
×
840
            try:
×
841
                if os.getpgid(proc.info["pid"]) == pgid:
×
842
                    proc_obj = psutil.Process(proc.info["pid"])
×
843
                    if _is_chrome_related_process(proc_obj):
×
844
                        additional_pids.append(proc.info["pid"])
×
845
                        logging.debug(
×
846
                            "Found Chrome-related process by pgid: PID %d, name: %s",
847
                            proc.info["pid"],
848
                            proc.info["name"],
849
                        )
850
            except (psutil.NoSuchProcess, psutil.AccessDenied, OSError):
×
851
                pass
×
852
    except (OSError, psutil.NoSuchProcess):
×
853
        logging.debug("Failed to get process group ID for chromedriver")
×
854
    return additional_pids
×
855

856

857
def _get_chrome_related_processes(driver: WebDriver) -> list[int]:
1✔
858
    """Chrome関連の全子プロセスを取得
859

860
    undetected_chromedriver 使用時、Chrome プロセスは chromedriver の子ではなく
861
    Python プロセスの直接の子として起動されることがあるため、両方を検索する。
862
    """
863
    chrome_pids = set()
×
864

865
    # 1. driver.service.process の子プロセスを検索
866
    try:
×
867
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "process"):  # type: ignore[attr-defined]
×
868
            process = driver.service.process  # type: ignore[attr-defined]
×
869
            if process and hasattr(process, "pid"):
×
870
                chromedriver_pid = process.pid
×
871

872
                # psutilでプロセス階層を取得
873
                parent_process = psutil.Process(chromedriver_pid)
×
874
                children = parent_process.children(recursive=True)
×
875

876
                for child in children:
×
877
                    chrome_pids.add(child.pid)
×
878
                    logging.debug(
×
879
                        "Found Chrome-related process (service child): PID %d, name: %s",
880
                        child.pid,
881
                        child.name(),
882
                    )
883
    except Exception:
×
884
        logging.exception("Failed to get Chrome-related processes from service")
×
885

886
    # 2. 現在の Python プロセスの全子孫から Chrome 関連プロセスを検索
887
    try:
×
888
        current_process = psutil.Process()
×
889
        all_children = current_process.children(recursive=True)
×
890

891
        for child in all_children:
×
892
            if child.pid in chrome_pids:
×
893
                continue
×
894
            try:
×
895
                if _is_chrome_related_process(child):
×
896
                    chrome_pids.add(child.pid)
×
897
                    logging.debug(
×
898
                        "Found Chrome-related process (python child): PID %d, name: %s",
899
                        child.pid,
900
                        child.name(),
901
                    )
902
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
903
                pass
×
904
    except Exception:
×
905
        logging.exception("Failed to get Chrome-related processes from python children")
×
906

907
    return list(chrome_pids)
×
908

909

910
def _send_signal_to_processes(pids: list[int], sig: signal.Signals, signal_name: str) -> None:
1✔
911
    """プロセスリストに指定されたシグナルを送信"""
912
    errors = []
×
913
    for pid in pids:
×
914
        try:
×
915
            # プロセス名を取得
916
            try:
×
917
                process = psutil.Process(pid)
×
918
                process_name = process.name()
×
919
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
920
                process_name = "unknown"
×
921

922
            if sig == signal.SIGKILL:
×
923
                # プロセスがまだ存在するかチェック
924
                os.kill(pid, 0)  # シグナル0は存在確認
×
925
            os.kill(pid, sig)
×
926
            logging.info("Sent %s to process: PID %d (%s)", signal_name, pid, process_name)
×
927
        except (ProcessLookupError, OSError) as e:  # noqa: PERF203
×
928
            # プロセスが既に終了している場合は無視
929
            errors.append((pid, e))
×
930

931
    # エラーが発生した場合はまとめてログ出力
932
    if errors:
×
933
        logging.debug("Failed to send %s to some processes: %s", signal_name, errors)
×
934

935

936
def _terminate_chrome_processes(chrome_pids: list[int], timeout: float = 5.0) -> None:
1✔
937
    """Chrome関連プロセスを段階的に終了
938

939
    Args:
940
        chrome_pids: 終了対象のプロセスIDリスト
941
        timeout: SIGTERM後にプロセス終了を待機する最大時間(秒)
942
    """
943
    if not chrome_pids:
×
944
        return
×
945

946
    # 優雅な終了(SIGTERM)
947
    _send_signal_to_processes(chrome_pids, signal.SIGTERM, "SIGTERM")
×
948

949
    # プロセスの終了を待機(ポーリング)
950
    remaining_pids = list(chrome_pids)
×
951
    poll_interval = 0.2
×
952
    elapsed = 0.0
×
953

954
    while remaining_pids and elapsed < timeout:
×
955
        time.sleep(poll_interval)
×
956
        elapsed += poll_interval
×
957

958
        # まだ生存しているプロセスをチェック
959
        still_alive = []
×
960
        for pid in remaining_pids:
×
961
            try:
×
962
                if psutil.pid_exists(pid):
×
963
                    process = psutil.Process(pid)
×
964
                    if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
×
965
                        still_alive.append(pid)
×
966
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
967
                pass
×
968

969
        remaining_pids = still_alive
×
970

971
    # タイムアウト後もまだ残っているプロセスにのみ SIGKILL を送信
972
    if remaining_pids:
×
973
        logging.warning(
×
974
            "Chrome processes still alive after %.1fs, sending SIGKILL to %d processes",
975
            elapsed,
976
            len(remaining_pids),
977
        )
978
        _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
979

980

981
def _reap_single_process(pid: int) -> None:
1✔
982
    """単一プロセスをwaitpidで回収"""
983
    try:
×
984
        # ノンブロッキングでwaitpid
985
        result_pid, status = os.waitpid(pid, os.WNOHANG)
×
986
        if result_pid == pid:
×
987
            # プロセス名を取得
988
            try:
×
989
                process = psutil.Process(pid)
×
990
                process_name = process.name()
×
991
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
992
                process_name = "unknown"
×
993
            logging.debug("Reaped Chrome process: PID %d (%s)", pid, process_name)
×
994
    except (ChildProcessError, OSError):
×
995
        # 子プロセスでない場合や既に回収済みの場合は無視
996
        pass
×
997

998

999
def _reap_chrome_processes(chrome_pids: list[int]) -> None:
1✔
1000
    """Chrome関連プロセスを明示的に回収してゾンビ化を防ぐ"""
1001
    for pid in chrome_pids:
×
1002
        _reap_single_process(pid)
×
1003

1004

1005
def _get_remaining_chrome_pids(chrome_pids: list[int]) -> list[int]:
1✔
1006
    """指定されたPIDリストから、まだ生存しているChrome関連プロセスを取得"""
1007
    remaining = []
×
1008
    for pid in chrome_pids:
×
1009
        try:
×
1010
            if psutil.pid_exists(pid):
×
1011
                process = psutil.Process(pid)
×
1012
                if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
×
1013
                    if _is_chrome_related_process(process):
×
1014
                        remaining.append(pid)
×
1015
        except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1016
            pass
×
1017
    return remaining
×
1018

1019

1020
def _wait_for_processes_with_check(
1✔
1021
    chrome_pids: list[int],
1022
    timeout: float,
1023
    poll_interval: float = 0.2,
1024
    log_interval: float = 1.0,
1025
) -> list[int]:
1026
    """プロセスの終了を待機しつつ、残存プロセスをチェック
1027

1028
    Args:
1029
        chrome_pids: 監視対象のプロセスIDリスト
1030
        timeout: 最大待機時間(秒)
1031
        poll_interval: チェック間隔(秒)
1032
        log_interval: ログ出力間隔(秒)
1033

1034
    Returns:
1035
        タイムアウト後も残存しているプロセスIDのリスト
1036
    """
1037
    elapsed = 0.0
×
1038
    last_log_time = 0.0
×
1039
    remaining_pids = list(chrome_pids)
×
1040

1041
    while remaining_pids and elapsed < timeout:
×
1042
        time.sleep(poll_interval)
×
1043
        elapsed += poll_interval
×
1044
        remaining_pids = _get_remaining_chrome_pids(remaining_pids)
×
1045

1046
        if remaining_pids and (elapsed - last_log_time) >= log_interval:
×
1047
            logging.info(
×
1048
                "Found %d remaining Chrome processes after %.0fs",
1049
                len(remaining_pids),
1050
                elapsed,
1051
            )
1052
            last_log_time = elapsed
×
1053

1054
    return remaining_pids
×
1055

1056

1057
def quit_driver_gracefully(
1✔
1058
    driver: WebDriver | None,
1059
    wait_sec: float = 5.0,
1060
    sigterm_wait_sec: float = 5.0,
1061
    sigkill_wait_sec: float = 5.0,
1062
) -> None:  # noqa: C901, PLR0912
1063
    """Chrome WebDriverを確実に終了する
1064

1065
    終了フロー:
1066
    1. driver.quit() を呼び出し
1067
    2. wait_sec 秒待機しつつプロセス終了をチェック
1068
    3. 残存プロセスがあれば SIGTERM を送信
1069
    4. sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1070
    5. 残存プロセスがあれば SIGKILL を送信
1071
    6. sigkill_wait_sec 秒待機
1072

1073
    Args:
1074
        driver: 終了する WebDriver インスタンス
1075
        wait_sec: quit 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1076
        sigterm_wait_sec: SIGTERM 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1077
        sigkill_wait_sec: SIGKILL 後にプロセス回収を待機する秒数(デフォルト: 5秒)
1078
    """
1079
    if driver is None:
1✔
1080
        return
1✔
1081

1082
    # quit前にChrome関連プロセスを記録
1083
    chrome_pids_before = _get_chrome_related_processes(driver)
1✔
1084

1085
    try:
1✔
1086
        # WebDriverの正常終了を試行(これがタブのクローズも含む)
1087
        driver.quit()
1✔
1088
        logging.info("WebDriver quit successfully")
1✔
1089
    except Exception:
1✔
1090
        logging.warning("Failed to quit driver normally", exc_info=True)
1✔
1091
    finally:
1092
        # undetected_chromedriver の __del__ がシャットダウン時に再度呼ばれるのを防ぐ
1093
        if hasattr(driver, "_has_quit"):
1✔
1094
            driver._has_quit = True  # type: ignore[attr-defined]
1✔
1095

1096
    # ChromeDriverサービスの停止を試行
1097
    try:
1✔
1098
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "stop"):  # type: ignore[attr-defined]
1✔
1099
            driver.service.stop()  # type: ignore[attr-defined]
×
1100
    except (ConnectionResetError, OSError):
×
1101
        # Chrome が既に終了している場合は無視
1102
        logging.debug("Chrome service already stopped")
×
1103
    except Exception:
×
1104
        logging.warning("Failed to stop Chrome service", exc_info=True)
×
1105

1106
    # Step 1: quit 後に wait_sec 秒待機しつつプロセス終了をチェック
1107
    remaining_pids = _wait_for_processes_with_check(chrome_pids_before, wait_sec)
1✔
1108

1109
    if not remaining_pids:
1✔
1110
        logging.debug("All Chrome processes exited normally")
1✔
1111
        return
1✔
1112

1113
    # Step 2: 残存プロセスに SIGTERM を送信
1114
    logging.info(
×
1115
        "Found %d remaining Chrome processes after %.0fs, sending SIGTERM",
1116
        len(remaining_pids),
1117
        wait_sec,
1118
    )
1119
    _send_signal_to_processes(remaining_pids, signal.SIGTERM, "SIGTERM")
×
1120

1121
    # Step 3: SIGTERM 後に sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1122
    remaining_pids = _wait_for_processes_with_check(remaining_pids, sigterm_wait_sec)
×
1123

1124
    if not remaining_pids:
×
1125
        logging.info("All Chrome processes exited after SIGTERM")
×
1126
        _reap_chrome_processes(chrome_pids_before)
×
1127
        return
×
1128

1129
    # Step 4: 残存プロセスに SIGKILL を送信
1130
    logging.warning(
×
1131
        "Chrome processes still alive after SIGTERM + %.1fs, sending SIGKILL to %d processes",
1132
        sigterm_wait_sec,
1133
        len(remaining_pids),
1134
    )
1135
    _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
1136

1137
    # Step 5: SIGKILL 後に sigkill_wait_sec 秒待機してプロセス回収
1138
    time.sleep(sigkill_wait_sec)
×
1139
    _reap_chrome_processes(chrome_pids_before)
×
1140

1141
    # 最終チェック:まだ残っているプロセスがあるか確認
1142
    still_remaining = _get_remaining_chrome_pids(remaining_pids)
×
1143

1144
    # 回収できなかったプロセスについて警告
1145
    if still_remaining:
×
1146
        for pid in still_remaining:
×
1147
            try:
×
1148
                process = psutil.Process(pid)
×
1149
                logging.warning("Failed to collect Chrome-related process: PID %d (%s)", pid, process.name())
×
1150
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1151
                pass
×
1152

1153

1154
if __name__ == "__main__":
1155
    import pathlib
1156

1157
    import docopt
1158
    import selenium.webdriver.support.wait
1159

1160
    import my_lib.config
1161
    import my_lib.logger
1162

1163
    assert __doc__ is not None
1164
    args = docopt.docopt(__doc__)
1165

1166
    config_file = args["-c"]
1167
    debug_mode = args["-D"]
1168

1169
    my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
1170

1171
    config = my_lib.config.load(config_file)
1172

1173
    driver = create_driver("test", pathlib.Path(config["data"]["selenium"]))
1174
    wait = selenium.webdriver.support.wait.WebDriverWait(driver, 5)
1175

1176
    driver.get("https://www.google.com/")
1177
    wait.until(
1178
        selenium.webdriver.support.expected_conditions.presence_of_element_located(
1179
            (selenium.webdriver.common.by.By.XPATH, '//input[contains(@value, "Google")]')
1180
        )
1181
    )
1182

1183
    quit_driver_gracefully(driver)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc