• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kimata / my-py-lib / 21092969449

17 Jan 2026 10:39AM UTC coverage: 62.974% (+2.0%) from 60.961%
21092969449

push

github

kimata
refactor: 型安全性向上とコード品質改善(第4弾)

## 主な変更

### センサー
- ADS1015/ADS1115 の重複コードを ads_base.py に統合
- センサー系の型注釈改善

### json_util.py
- iso_pattern の重複定義をモジュールレベル定数に統一
- DateTimeJSONEncoder.default() の type: ignore を削除

### store/mercari
- MercariItem dataclass を導入し、item 辞書を型安全に
- ProgressObserver Protocol の型定義を MercariItem に変更

### store/amazon
- AmazonItem.to_dict() を dataclasses.asdict() で簡潔化

### その他
- flask_util.py の type: ignore を cast() に置換
- lifecycle_manager.py を削除(lifecycle/manager.py に移行済み)
- 各種型注釈・docstring の改善

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

191 of 246 new or added lines in 29 files covered. (77.64%)

112 existing lines in 12 files now uncovered.

3439 of 5461 relevant lines covered (62.97%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.04
/src/my_lib/selenium_util.py
1
#!/usr/bin/env python3
2
"""
3
Selenium を Chrome Driver を使って動かします。
4

5
Usage:
6
  selenium_util.py [-c CONFIG] [-D]
7

8
Options:
9
  -c CONFIG         : CONFIG を設定ファイルとして読み込んで実行します。
10
                      [default: tests/fixtures/config.example.yaml]
11
  -D                : デバッグモードで動作します。
12
"""
13

14
from __future__ import annotations
1✔
15

16
import datetime
1✔
17
import inspect
1✔
18
import io
1✔
19
import logging
1✔
20
import os
1✔
21
import pathlib
1✔
22
import random
1✔
23
import re
1✔
24
import signal
1✔
25
import subprocess
1✔
26
import time
1✔
27
from typing import TYPE_CHECKING, Any, Self, TypeVar
1✔
28

29
import PIL.Image
1✔
30
import psutil
1✔
31
import selenium
1✔
32
import selenium.common.exceptions
1✔
33
import selenium.webdriver.chrome.options
1✔
34
import selenium.webdriver.chrome.service
1✔
35
import selenium.webdriver.common.action_chains
1✔
36
import selenium.webdriver.common.by
1✔
37
import selenium.webdriver.common.keys
1✔
38
import selenium.webdriver.support.expected_conditions
1✔
39

40
import my_lib.chrome_util
1✔
41
import my_lib.time
1✔
42

43
if TYPE_CHECKING:
44
    import types
45
    from collections.abc import Callable
46

47
    from selenium.webdriver.remote.webdriver import WebDriver
48
    from selenium.webdriver.support.wait import WebDriverWait
49

50
T = TypeVar("T")
1✔
51

52
WAIT_RETRY_COUNT: int = 1
1✔
53

54

55
class SeleniumError(Exception):
1✔
56
    """Selenium 関連エラーの基底クラス"""
57

58

59
def _get_chrome_version() -> int | None:
1✔
60
    try:
1✔
61
        result = subprocess.run(
1✔
62
            ["google-chrome", "--version"],  # noqa: S607
63
            capture_output=True,
64
            text=True,
65
            timeout=10,
66
            check=False,
67
        )
68
        match = re.search(r"(\d+)\.", result.stdout)
1✔
69
        if match:
1✔
70
            return int(match.group(1))
1✔
71
    except Exception:
1✔
72
        logging.warning("Failed to detect Chrome version")
1✔
73
    return None
1✔
74

75

76
def _create_chrome_options(
1✔
77
    profile_name: str,
78
    chrome_data_path: pathlib.Path,
79
    log_path: pathlib.Path,
80
    is_headless: bool,
81
) -> selenium.webdriver.chrome.options.Options:
82
    """Chrome オプションを作成する
83

84
    Args:
85
        profile_name: プロファイル名
86
        chrome_data_path: Chrome データディレクトリのパス
87
        log_path: ログディレクトリのパス
88
        is_headless: ヘッドレスモードで起動するか
89

90
    Returns:
91
        設定済みの Chrome オプション
92

93
    """
94
    options = selenium.webdriver.chrome.options.Options()
×
95

96
    if is_headless:
×
97
        options.add_argument("--headless=new")
×
98

99
    options.add_argument("--no-sandbox")  # for Docker
×
100
    options.add_argument("--disable-dev-shm-usage")  # for Docker
×
101
    options.add_argument("--disable-gpu")
×
102

103
    options.add_argument("--disable-popup-blocking")
×
104
    options.add_argument("--disable-plugins")
×
105

106
    options.add_argument("--no-first-run")
×
107

108
    options.add_argument("--lang=ja-JP")
×
109
    options.add_argument("--window-size=1920,1080")
×
110

111
    # NOTE: Accept-Language ヘッダーを日本語優先に設定
112
    options.add_experimental_option("prefs", {"intl.accept_languages": "ja-JP,ja,en-US,en"})
×
113

114
    options.add_argument("--user-data-dir=" + str(chrome_data_path / profile_name))
×
115

116
    options.add_argument("--enable-logging")
×
117
    options.add_argument("--v=1")
×
118

119
    chrome_log_file = log_path / f"chrome_{profile_name}.log"
×
120
    options.add_argument(f"--log-file={chrome_log_file!s}")
×
121

122
    if not is_headless:
×
123
        options.add_argument("--auto-open-devtools-for-tabs")
×
124

125
    return options
×
126

127

128
def _create_driver_impl(
1✔
129
    profile_name: str,
130
    data_path: pathlib.Path,
131
    is_headless: bool,
132
    use_subprocess: bool,
133
    use_undetected: bool,
134
) -> WebDriver:
135
    """WebDriver を作成する内部実装
136

137
    Args:
138
        profile_name: プロファイル名
139
        data_path: データディレクトリのパス
140
        is_headless: ヘッドレスモードで起動するか
141
        use_subprocess: サブプロセスで Chrome を起動するか(undetected_chromedriver のみ)
142
        use_undetected: undetected_chromedriver を使用するか
143

144
    Returns:
145
        WebDriver インスタンス
146

147
    """
148
    chrome_data_path = data_path / "chrome"
×
149
    log_path = data_path / "log"
×
150

151
    # NOTE: Pytest を並列実行できるようにする
152
    suffix = os.environ.get("PYTEST_XDIST_WORKER", None)
×
153
    if suffix is not None:
×
154
        profile_name += "." + suffix
×
155

156
    chrome_data_path.mkdir(parents=True, exist_ok=True)
×
157
    log_path.mkdir(parents=True, exist_ok=True)
×
158

159
    options = _create_chrome_options(profile_name, chrome_data_path, log_path, is_headless)
×
160

161
    service = selenium.webdriver.chrome.service.Service(
×
162
        service_args=["--verbose", f"--log-path={str(log_path / 'webdriver.log')!s}"],
163
    )
164

165
    if use_undetected:
×
166
        import undetected_chromedriver
×
167

168
        chrome_version = _get_chrome_version()
×
169

170
        # NOTE: user_multi_procs=True は既存の chromedriver ファイルが存在することを前提としているため、
171
        # ファイルが存在しない場合(CI環境の初回実行など)は False にする
172
        uc_data_path = pathlib.Path("~/.local/share/undetected_chromedriver").expanduser()
×
173
        use_multi_procs = uc_data_path.exists() and any(uc_data_path.glob("*chromedriver*"))
×
174

175
        driver = undetected_chromedriver.Chrome(
×
176
            service=service,
177
            options=options,
178
            use_subprocess=use_subprocess,
179
            version_main=chrome_version,
180
            user_multi_procs=use_multi_procs,
181
        )
182
    else:
183
        driver = selenium.webdriver.Chrome(
×
184
            service=service,
185
            options=options,
186
        )
187

188
    driver.set_page_load_timeout(30)
×
189

190
    # CDP を使って日本語ロケールを強制設定
191
    set_japanese_locale(driver)
×
192

193
    return driver
×
194

195

196
def create_driver(
1✔
197
    profile_name: str,
198
    data_path: pathlib.Path,
199
    is_headless: bool = True,
200
    clean_profile: bool = False,
201
    auto_recover: bool = True,
202
    use_undetected: bool = True,
203
    use_subprocess: bool = False,
204
) -> WebDriver:
205
    """Chrome WebDriver を作成する
206

207
    Args:
208
        profile_name: プロファイル名
209
        data_path: データディレクトリのパス
210
        is_headless: ヘッドレスモードで起動するか
211
        clean_profile: 起動前にロックファイルを削除するか
212
        auto_recover: プロファイル破損時に自動リカバリするか
213
        use_undetected: undetected_chromedriver を使用するか(デフォルト: True)
214
        use_subprocess: サブプロセスで Chrome を起動するか(undetected_chromedriver のみ)
215

216
    Raises:
217
        ValueError: use_undetected=False かつ use_subprocess=True の場合
218

219
    """
220
    if not use_undetected and use_subprocess:
×
221
        raise ValueError("use_subprocess=True は use_undetected=True の場合のみ有効です")
×
222

223
    # NOTE: ルートロガーの出力レベルを変更した場合でも Selenium 関係は抑制する
224
    logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
×
225
    logging.getLogger("selenium.webdriver.common.selenium_manager").setLevel(logging.WARNING)
×
226
    logging.getLogger("selenium.webdriver.remote.remote_connection").setLevel(logging.WARNING)
×
227

228
    # NOTE: chrome_util の内部関数を使用(同一パッケージ内での使用は許容)
229
    actual_profile_name = my_lib.chrome_util._get_actual_profile_name(profile_name)
×
230
    profile_path = data_path / "chrome" / actual_profile_name
×
231

232
    # プロファイル健全性チェック
233
    health = my_lib.chrome_util._check_profile_health(profile_path)
×
234
    if not health.is_healthy:
×
235
        logging.warning("Profile health check failed: %s", ", ".join(health.errors))
×
236

237
        if health.has_lock_files and not (health.has_corrupted_json or health.has_corrupted_db):
×
238
            # ロックファイルのみの問題なら削除して続行
239
            logging.info("Cleaning up lock files only")
×
240
            my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
241
        elif auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
242
            # JSON または DB が破損している場合はプロファイルをリカバリ
243
            logging.warning("Profile is corrupted, attempting recovery")
×
244
            if my_lib.chrome_util._recover_corrupted_profile(profile_path):
×
245
                logging.info("Profile recovery successful, will create new profile")
×
246
            else:
247
                logging.error("Profile recovery failed")
×
248

249
    if clean_profile:
×
250
        my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
251

252
    # NOTE: 1回だけ自動リトライ
253
    try:
×
254
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess, use_undetected)
×
255
    except Exception as e:
×
256
        logging.warning("First attempt to create driver failed: %s", e)
×
257

258
        # コンテナ内で実行中の場合のみ、残った Chrome プロセスをクリーンアップ
259
        my_lib.chrome_util._cleanup_orphaned_chrome_processes_in_container()
×
260

261
        # プロファイルのロックファイルを削除
262
        my_lib.chrome_util._cleanup_profile_lock(profile_path)
×
263

264
        # 再度健全性チェック
265
        health = my_lib.chrome_util._check_profile_health(profile_path)
×
266
        if not health.is_healthy and auto_recover and (health.has_corrupted_json or health.has_corrupted_db):
×
267
            logging.warning("Profile still corrupted after first attempt, recovering")
×
268
            my_lib.chrome_util._recover_corrupted_profile(profile_path)
×
269

270
        return _create_driver_impl(profile_name, data_path, is_headless, use_subprocess, use_undetected)
×
271

272

273
def xpath_exists(driver: WebDriver, xpath: str) -> bool:
1✔
274
    return len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0
×
275

276

277
def get_text(
1✔
278
    driver: WebDriver,
279
    xpath: str,
280
    safe_text: str,
281
    wait: WebDriverWait[WebDriver] | None = None,
282
) -> str:
283
    if wait is not None:
×
284
        wait.until(
×
285
            selenium.webdriver.support.expected_conditions.presence_of_all_elements_located(
286
                (selenium.webdriver.common.by.By.XPATH, xpath)
287
            )
288
        )
289

290
    if len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0:
×
291
        return driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).text.strip()
×
292
    else:
293
        return safe_text
×
294

295

296
def input_xpath(
1✔
297
    driver: WebDriver,
298
    xpath: str,
299
    text: str,
300
    wait: WebDriverWait[WebDriver] | None = None,
301
    is_warn: bool = True,
302
) -> bool:
303
    if wait is not None:
×
304
        wait.until(
×
305
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
306
                (selenium.webdriver.common.by.By.XPATH, xpath)
307
            )
308
        )
309
        time.sleep(0.05)
×
310

311
    if xpath_exists(driver, xpath):
×
312
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).send_keys(text)
×
313
        return True
×
314
    else:
315
        if is_warn:
×
316
            logging.warning("Element is not found: %s", xpath)
×
317
        return False
×
318

319

320
def click_xpath(
1✔
321
    driver: WebDriver,
322
    xpath: str,
323
    wait: WebDriverWait[WebDriver] | None = None,
324
    is_warn: bool = True,
325
    move: bool = False,
326
) -> bool:
327
    if wait is not None:
×
328
        wait.until(
×
329
            selenium.webdriver.support.expected_conditions.element_to_be_clickable(
330
                (selenium.webdriver.common.by.By.XPATH, xpath)
331
            )
332
        )
333
        time.sleep(0.05)
×
334

335
    if xpath_exists(driver, xpath):
×
336
        elem = driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath)
×
337
        if move:
×
338
            action = selenium.webdriver.common.action_chains.ActionChains(driver)
×
339
            action.move_to_element(elem)
×
340
            action.perform()
×
341

342
        elem.click()
×
343
        return True
×
344
    else:
345
        if is_warn:
×
346
            logging.warning("Element is not found: %s", xpath)
×
347
        return False
×
348

349

350
def is_display(driver: WebDriver, xpath: str) -> bool:
1✔
351
    return (len(driver.find_elements(selenium.webdriver.common.by.By.XPATH, xpath)) != 0) and (
×
352
        driver.find_element(selenium.webdriver.common.by.By.XPATH, xpath).is_displayed()
353
    )
354

355

356
def random_sleep(sec: float) -> None:
1✔
357
    RATIO = 0.8
1✔
358

359
    time.sleep((sec * RATIO) + (sec * (1 - RATIO) * 2) * random.random())  # noqa: S311
1✔
360

361

362
def with_retry(
1✔
363
    func: Callable[[], T],
364
    max_retries: int = 3,
365
    delay: float = 1.0,
366
    exceptions: tuple[type[Exception], ...] = (Exception,),
367
    on_retry: Callable[[int, Exception], bool | None] | None = None,
368
) -> T:
369
    """リトライ付きで関数を実行
370

371
    全て失敗した場合は最後の例外を再スロー。
372
    呼び出し側で try/except してエラー処理を行う。
373

374
    Args:
375
        func: 実行する関数
376
        max_retries: 最大リトライ回数
377
        delay: リトライ間の待機秒数
378
        exceptions: リトライ対象の例外タプル
379
        on_retry: リトライ時のコールバック (attempt, exception)
380
            - None または True を返すとリトライを継続
381
            - False を返すとリトライを中止して例外を再スロー
382

383
    Returns:
384
        成功時は関数の戻り値
385

386
    Raises:
387
        最後の例外を再スロー
388

389
    """
390
    last_exception: Exception | None = None
1✔
391

392
    for attempt in range(max_retries):
1✔
393
        try:
1✔
394
            return func()
1✔
395
        except exceptions as e:
1✔
396
            last_exception = e
1✔
397
            if attempt < max_retries - 1:
1✔
398
                if on_retry:
1✔
399
                    should_continue = on_retry(attempt + 1, e)
1✔
400
                    if should_continue is False:
1✔
401
                        raise
1✔
402
                time.sleep(delay)
1✔
403

404
    if last_exception:
1✔
405
        raise last_exception
1✔
406
    raise RuntimeError("Unexpected state in with_retry")
×
407

408

409
def with_session_retry(
1✔
410
    func: Callable[[], T],
411
    driver_name: str,
412
    data_dir: pathlib.Path,
413
    *,
414
    max_retries: int = 1,
415
    clear_profile_on_error: bool = True,
416
    on_retry: Callable[[int, int], None] | None = None,
417
    before_retry: Callable[[], None] | None = None,
418
) -> T:
419
    """InvalidSessionIdException 発生時にプロファイル削除してリトライ
420

421
    ブラウザのセッションが無効になった場合(クラッシュ等)、
422
    プロファイルを削除して再起動することで復旧を試みる。
423

424
    Args:
425
        func: 実行する関数
426
        driver_name: Chrome プロファイル名
427
        data_dir: Selenium データディレクトリ
428
        max_retries: 最大リトライ回数 (デフォルト: 1)
429
        clear_profile_on_error: エラー時にプロファイルを削除するか
430
        on_retry: リトライ時のコールバック (attempt, max_retries) - ステータス更新用
431
        before_retry: リトライ前のコールバック - quit_selenium() 呼び出し用
432

433
    Returns:
434
        成功時は関数の戻り値
435

436
    Raises:
437
        InvalidSessionIdException: リトライ上限に達した場合
438

439
    """
440

441
    def retry_handler(attempt: int, exception: Exception) -> bool | None:
1✔
442
        if before_retry:
1✔
443
            before_retry()
1✔
444

445
        if attempt <= max_retries and clear_profile_on_error:
1✔
446
            logging.warning(
1✔
447
                "セッションエラーが発生しました。プロファイルを削除してリトライします(%d/%d)",
448
                attempt,
449
                max_retries,
450
            )
451
            if on_retry:
1✔
452
                on_retry(attempt, max_retries)
1✔
453
            my_lib.chrome_util.delete_profile(driver_name, data_dir)
1✔
454
            return True
1✔
455
        return False
1✔
456

457
    return with_retry(
1✔
458
        func,
459
        max_retries=max_retries + 1,
460
        delay=0,
461
        exceptions=(selenium.common.exceptions.InvalidSessionIdException,),
462
        on_retry=retry_handler,
463
    )
464

465

466
def wait_patiently(
1✔
467
    driver: WebDriver,
468
    wait: WebDriverWait[WebDriver],
469
    target: Any,
470
) -> None:
471
    error: selenium.common.exceptions.TimeoutException | None = None
×
472
    for i in range(WAIT_RETRY_COUNT + 1):
×
473
        try:
×
474
            wait.until(target)
×
475
            return
×
476
        except selenium.common.exceptions.TimeoutException as e:
×
477
            logging.warning(
×
478
                "タイムアウトが発生しました。(%s in %s line %d)",
479
                inspect.stack()[1].function,
480
                inspect.stack()[1].filename,
481
                inspect.stack()[1].lineno,
482
            )
483
            error = e
×
484

485
            logging.info(i)
×
486
            if i != WAIT_RETRY_COUNT:
×
487
                logging.info("refresh")
×
488
                driver.refresh()
×
489

490
    if error is not None:
×
491
        raise error
×
492

493

494
def dump_page(
1✔
495
    driver: WebDriver,
496
    index: int,
497
    dump_path: pathlib.Path,
498
    stack_index: int = 1,
499
) -> None:
500
    name = inspect.stack()[stack_index].function.replace("<", "").replace(">", "")
×
501

502
    dump_path.mkdir(parents=True, exist_ok=True)
×
503

504
    png_path = dump_path / f"{name}_{index:02d}.png"
×
505
    htm_path = dump_path / f"{name}_{index:02d}.htm"
×
506

507
    driver.save_screenshot(str(png_path))
×
508

509
    with htm_path.open("w", encoding="utf-8") as f:
×
510
        f.write(driver.page_source)
×
511

512
    logging.info(
×
513
        "page dump: %02d from %s in %s line %d",
514
        index,
515
        inspect.stack()[stack_index].function,
516
        inspect.stack()[stack_index].filename,
517
        inspect.stack()[stack_index].lineno,
518
    )
519

520

521
def clear_cache(driver: WebDriver) -> None:
1✔
522
    driver.execute_cdp_cmd("Network.clearBrowserCache", {})
×
523

524

525
def set_japanese_locale(driver: WebDriver) -> None:
1✔
526
    """CDP を使って日本語ロケールを強制設定
527

528
    Chrome の起動オプションだけでは言語設定が変わってしまうことがあるため、
529
    CDP を使って Accept-Language ヘッダーとロケールを強制的に日本語に設定する。
530
    """
531
    try:
×
532
        # NOTE: Network.setExtraHTTPHeaders は Network.enable を先に呼ばないと機能しない
533
        driver.execute_cdp_cmd("Network.enable", {})
×
534
        driver.execute_cdp_cmd(
×
535
            "Network.setExtraHTTPHeaders",
536
            {"headers": {"Accept-Language": "ja-JP,ja;q=0.9"}},
537
        )
538
        driver.execute_cdp_cmd(
×
539
            "Emulation.setLocaleOverride",
540
            {"locale": "ja-JP"},
541
        )
542
        logging.debug("Japanese locale set via CDP")
×
543
    except Exception:
×
544
        logging.warning("Failed to set Japanese locale via CDP")
×
545

546

547
def clean_dump(dump_path: pathlib.Path, keep_days: int = 1) -> None:
1✔
548
    if not dump_path.exists():
1✔
549
        return
1✔
550

551
    time_threshold = datetime.timedelta(keep_days)
1✔
552

553
    for item in dump_path.iterdir():
1✔
554
        if not item.is_file():
1✔
555
            continue
1✔
556
        try:
1✔
557
            time_diff = datetime.datetime.now(datetime.UTC) - datetime.datetime.fromtimestamp(
1✔
558
                item.stat().st_mtime, datetime.UTC
559
            )
560
        except FileNotFoundError:
×
561
            # ファイルが別プロセスにより削除された場合(SQLiteの一時ファイルなど)
562
            continue
×
563
        if time_diff > time_threshold:
1✔
564
            logging.warning("remove %s [%s day(s) old].", item.absolute(), f"{time_diff.days:,}")
1✔
565

566
            item.unlink(missing_ok=True)
1✔
567

568

569
def get_memory_info(driver: WebDriver) -> dict[str, Any]:
1✔
570
    """ブラウザのメモリ使用量を取得(単位: KB)"""
571
    total_bytes = subprocess.Popen(  # noqa: S602
×
572
        "smem -t -c pss -P chrome | tail -n 1",  # noqa: S607
573
        shell=True,
574
        stdout=subprocess.PIPE,
575
    ).communicate()[0]
576
    total = int(str(total_bytes, "utf-8").strip())  # smem の出力は KB 単位
×
577

578
    try:
×
579
        memory_info = driver.execute_cdp_cmd("Memory.getAllTimeSamplingProfile", {})
×
580
        heap_usage = driver.execute_cdp_cmd("Runtime.getHeapUsage", {})
×
581

582
        heap_used = heap_usage.get("usedSize", 0) // 1024  # bytes → KB
×
583
        heap_total = heap_usage.get("totalSize", 0) // 1024  # bytes → KB
×
584
    except Exception as e:
×
585
        logging.debug("Failed to get memory usage: %s", e)
×
586

587
        memory_info = None
×
588
        heap_used = 0
×
589
        heap_total = 0
×
590

591
    return {
×
592
        "total": total,
593
        "heap_used": heap_used,
594
        "heap_total": heap_total,
595
        "memory_info": memory_info,
596
    }
597

598

599
def log_memory_usage(driver: WebDriver) -> None:
1✔
600
    mem_info = get_memory_info(driver)
×
601
    logging.info(
×
602
        "Chrome memory: %s MB (JS heap: %s MB)",
603
        f"""{mem_info["total"] // 1024:,}""",
604
        f"""{mem_info["heap_used"] // 1024:,}""",
605
    )
606

607

608
def _warmup(
1✔
609
    driver: WebDriver,
610
    keyword: str,
611
    url_pattern: str,
612
    sleep_sec: int = 3,
613
) -> None:
614
    # NOTE: ダミーアクセスを行って BOT ではないと思わせる。(効果なさそう...)
615
    driver.get("https://www.yahoo.co.jp/")
×
616
    time.sleep(sleep_sec)
×
617

618
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(keyword)
×
619
    driver.find_element(selenium.webdriver.common.by.By.XPATH, '//input[@name="p"]').send_keys(
×
620
        selenium.webdriver.common.keys.Keys.ENTER
621
    )
622

623
    time.sleep(sleep_sec)
×
624

625
    driver.find_element(
×
626
        selenium.webdriver.common.by.By.XPATH, f'//a[contains(@href, "{url_pattern}")]'
627
    ).click()
628

629
    time.sleep(sleep_sec)
×
630

631

632
class browser_tab:
1✔
633
    """新しいブラウザタブで URL を開くコンテキストマネージャ"""
634

635
    def __init__(self, driver: WebDriver, url: str) -> None:
1✔
636
        """初期化
637

638
        Args:
639
            driver: WebDriver インスタンス
640
            url: 開く URL
641

642
        """
643
        self.driver = driver
1✔
644
        self.url = url
1✔
645
        self.original_window: str | None = None
1✔
646

647
    def __enter__(self) -> None:
1✔
648
        """新しいタブを開いて URL にアクセス"""
649
        self.original_window = self.driver.current_window_handle
1✔
650
        self.driver.execute_script("window.open('');")
1✔
651
        self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
652
        try:
1✔
653
            self.driver.get(self.url)
1✔
654
        except Exception:
×
655
            # NOTE: URL読み込みに失敗した場合もクリーンアップしてから例外を再送出
656
            self._cleanup()
×
657
            raise
×
658

659
    def _cleanup(self) -> None:
1✔
660
        """タブを閉じて元のウィンドウに戻る"""
661
        try:
1✔
662
            # 余分なタブを閉じる
663
            while len(self.driver.window_handles) > 1:
1✔
664
                self.driver.switch_to.window(self.driver.window_handles[-1])
1✔
665
                self.driver.close()
1✔
666
            if self.original_window is not None:
1✔
667
                self.driver.switch_to.window(self.original_window)
1✔
668
            time.sleep(0.1)
1✔
669
        except Exception:
1✔
670
            # NOTE: Chromeがクラッシュした場合は無視(既に終了しているため操作不可)
671
            logging.exception("タブのクリーンアップに失敗しました(Chromeがクラッシュした可能性があります)")
1✔
672

673
    def _recover_from_error(self) -> None:
1✔
674
        """エラー後にブラウザの状態を回復する"""
675
        try:
×
676
            # ページロードタイムアウトをリセット(負の値になっている可能性があるため)
677
            self.driver.set_page_load_timeout(30)
×
678

679
            # about:blank に移動してレンダラーの状態をリセット
680
            self.driver.get("about:blank")
×
681
            time.sleep(0.5)
×
682
        except Exception:
×
683
            logging.warning("ブラウザの回復に失敗しました")
×
684

685
    def __exit__(
1✔
686
        self,
687
        exception_type: type[BaseException] | None,
688
        exception_value: BaseException | None,
689
        traceback: types.TracebackType | None,
690
    ) -> None:
691
        """タブを閉じて元のウィンドウに戻る"""
692
        self._cleanup()
1✔
693

694
        # 例外が発生した場合はブラウザの状態を回復
695
        if exception_type is not None:
1✔
696
            self._recover_from_error()
×
697

698

699
class error_handler:
1✔
700
    """Selenium操作時のエラーハンドリング用コンテキストマネージャ
701

702
    エラー発生時に自動でログ出力、スクリーンショット取得、コールバック呼び出しを行う。
703

704
    Args:
705
        driver: WebDriver インスタンス
706
        message: ログに出力するエラーメッセージ
707
        on_error: エラー時に呼ばれるコールバック関数
708
            (exception, screenshot: PIL.Image.Image | None, page_source: str | None) -> None
709
        capture_screenshot: スクリーンショットを自動取得するか(デフォルト: True)
710
        reraise: 例外を再送出するか(デフォルト: True)
711

712
    Attributes:
713
        exception: 発生した例外(エラーがなければ None)
714
        screenshot: 取得したスクリーンショット(PIL.Image.Image、取得失敗時は None)
715
        page_source: 取得したページソース(取得失敗時は None)
716

717
    Examples:
718
        基本的な使用方法::
719

720
            with my_lib.selenium_util.error_handler(driver, message="ログイン処理に失敗") as handler:
721
                driver.get(login_url)
722
                driver.find_element(...).click()
723

724
        コールバック付き(Slack通知など)::
725

726
            def notify(exc, screenshot, page_source):
727
                slack.notify_error_with_page(config, "エラー発生", exc, screenshot, page_source)
728

729
            with my_lib.selenium_util.error_handler(
730
                driver,
731
                message="クロール処理に失敗",
732
                on_error=notify,
733
            ):
734
                crawl_page(driver)
735

736
        例外を抑制して続行::
737

738
            with my_lib.selenium_util.error_handler(driver, reraise=False) as handler:
739
                risky_operation()
740

741
            if handler.exception:
742
                logging.warning("処理をスキップしました")
743

744
    """
745

746
    def __init__(
1✔
747
        self,
748
        driver: WebDriver,
749
        message: str = "Selenium operation failed",
750
        on_error: Callable[[Exception, PIL.Image.Image | None, str | None], None] | None = None,
751
        capture_screenshot: bool = True,
752
        reraise: bool = True,
753
    ) -> None:
754
        """初期化"""
755
        self.driver = driver
1✔
756
        self.message = message
1✔
757
        self.on_error = on_error
1✔
758
        self.capture_screenshot = capture_screenshot
1✔
759
        self.reraise = reraise
1✔
760
        self.exception: Exception | None = None
1✔
761
        self.screenshot: PIL.Image.Image | None = None
1✔
762
        self.page_source: str | None = None
1✔
763

764
    def __enter__(self) -> Self:
1✔
765
        """コンテキストマネージャの開始"""
766
        return self
1✔
767

768
    def __exit__(
1✔
769
        self,
770
        exception_type: type[BaseException] | None,
771
        exception_value: BaseException | None,
772
        traceback: types.TracebackType | None,
773
    ) -> bool:
774
        """コンテキストマネージャの終了、エラー処理を実行"""
775
        if exception_value is None:
1✔
776
            return False
1✔
777

778
        # 例外を記録
779
        if isinstance(exception_value, Exception):
1✔
780
            self.exception = exception_value
1✔
781
        else:
782
            # BaseException(KeyboardInterrupt など)は処理せず再送出
783
            return False
×
784

785
        # ログ出力
786
        logging.exception(self.message)
1✔
787

788
        # スクリーンショット・ページソース取得
789
        if self.capture_screenshot:
1✔
790
            try:
1✔
791
                self.page_source = self.driver.page_source
1✔
792
            except Exception:
×
793
                logging.debug("Failed to capture page source for error handling")
×
794
            try:
1✔
795
                screenshot_bytes = self.driver.get_screenshot_as_png()
1✔
796
                self.screenshot = PIL.Image.open(io.BytesIO(screenshot_bytes))
1✔
797
            except Exception:
1✔
798
                logging.debug("Failed to capture screenshot for error handling")
1✔
799

800
        # コールバック呼び出し
801
        if self.on_error is not None:
1✔
802
            try:
1✔
803
                self.on_error(self.exception, self.screenshot, self.page_source)
1✔
804
            except Exception:
×
805
                logging.exception("Error in on_error callback")
×
806

807
        # reraise=False なら例外を抑制
808
        return not self.reraise
1✔
809

810

811
def _is_chrome_related_process(process: psutil.Process) -> bool:
1✔
812
    """プロセスがChrome関連かどうかを判定"""
813
    try:
1✔
814
        process_name = process.name().lower()
1✔
815
        # Chrome関連のプロセス名パターン
816
        chrome_patterns = ["chrome", "chromium", "google-chrome", "undetected_chro"]
1✔
817
        # chromedriverは除外
818
        if "chromedriver" in process_name:
1✔
819
            return False
1✔
820
        return any(pattern in process_name for pattern in chrome_patterns)
1✔
821
    except (psutil.NoSuchProcess, psutil.AccessDenied):
1✔
822
        return False
1✔
823

824

825
def _get_chrome_processes_by_pgid(chromedriver_pid: int, existing_pids: set[int]) -> list[int]:
1✔
826
    """プロセスグループIDで追加のChrome関連プロセスを取得"""
827
    additional_pids = []
×
828
    try:
×
829
        pgid = os.getpgid(chromedriver_pid)
×
830
        for proc in psutil.process_iter(["pid", "name", "ppid"]):
×
831
            if proc.info["pid"] in existing_pids:
×
832
                continue
×
833
            try:
×
834
                if os.getpgid(proc.info["pid"]) == pgid:
×
835
                    proc_obj = psutil.Process(proc.info["pid"])
×
836
                    if _is_chrome_related_process(proc_obj):
×
837
                        additional_pids.append(proc.info["pid"])
×
838
                        logging.debug(
×
839
                            "Found Chrome-related process by pgid: PID %d, name: %s",
840
                            proc.info["pid"],
841
                            proc.info["name"],
842
                        )
843
            except (psutil.NoSuchProcess, psutil.AccessDenied, OSError):
×
844
                pass
×
845
    except (OSError, psutil.NoSuchProcess):
×
846
        logging.debug("Failed to get process group ID for chromedriver")
×
847
    return additional_pids
×
848

849

850
def _get_chrome_related_processes(driver: WebDriver) -> list[int]:
1✔
851
    """Chrome関連の全子プロセスを取得
852

853
    undetected_chromedriver 使用時、Chrome プロセスは chromedriver の子ではなく
854
    Python プロセスの直接の子として起動されることがあるため、両方を検索する。
855
    """
856
    chrome_pids = set()
×
857

858
    # 1. driver.service.process の子プロセスを検索
859
    # NOTE: Chrome/Firefox WebDriver には service 属性があるが、型スタブでは定義されていない
860
    try:
×
NEW
861
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "process"):  # type: ignore[union-attr]
×
NEW
862
            process = driver.service.process  # type: ignore[union-attr]
×
863
            if process and hasattr(process, "pid"):
×
864
                chromedriver_pid = process.pid
×
865

866
                # psutilでプロセス階層を取得
867
                parent_process = psutil.Process(chromedriver_pid)
×
868
                children = parent_process.children(recursive=True)
×
869

870
                for child in children:
×
871
                    chrome_pids.add(child.pid)
×
872
                    logging.debug(
×
873
                        "Found Chrome-related process (service child): PID %d, name: %s",
874
                        child.pid,
875
                        child.name(),
876
                    )
877
    except Exception:
×
878
        logging.exception("Failed to get Chrome-related processes from service")
×
879

880
    # 2. 現在の Python プロセスの全子孫から Chrome 関連プロセスを検索
881
    try:
×
882
        current_process = psutil.Process()
×
883
        all_children = current_process.children(recursive=True)
×
884

885
        for child in all_children:
×
886
            if child.pid in chrome_pids:
×
887
                continue
×
888
            try:
×
889
                if _is_chrome_related_process(child):
×
890
                    chrome_pids.add(child.pid)
×
891
                    logging.debug(
×
892
                        "Found Chrome-related process (python child): PID %d, name: %s",
893
                        child.pid,
894
                        child.name(),
895
                    )
896
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
897
                pass
×
898
    except Exception:
×
899
        logging.exception("Failed to get Chrome-related processes from python children")
×
900

901
    return list(chrome_pids)
×
902

903

904
def _send_signal_to_processes(pids: list[int], sig: signal.Signals, signal_name: str) -> None:
1✔
905
    """プロセスリストに指定されたシグナルを送信"""
906
    errors = []
×
907
    for pid in pids:
×
908
        try:
×
909
            # プロセス名を取得
910
            try:
×
911
                process = psutil.Process(pid)
×
912
                process_name = process.name()
×
913
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
914
                process_name = "unknown"
×
915

916
            if sig == signal.SIGKILL:
×
917
                # プロセスがまだ存在するかチェック
918
                os.kill(pid, 0)  # シグナル0は存在確認
×
919
            os.kill(pid, sig)
×
920
            logging.info("Sent %s to process: PID %d (%s)", signal_name, pid, process_name)
×
921
        except (ProcessLookupError, OSError) as e:
×
922
            # プロセスが既に終了している場合は無視
923
            errors.append((pid, e))
×
924

925
    # エラーが発生した場合はまとめてログ出力
926
    if errors:
×
927
        logging.debug("Failed to send %s to some processes: %s", signal_name, errors)
×
928

929

930
def _terminate_chrome_processes(chrome_pids: list[int], timeout: float = 5.0) -> None:
1✔
931
    """Chrome関連プロセスを段階的に終了
932

933
    Args:
934
        chrome_pids: 終了対象のプロセスIDリスト
935
        timeout: SIGTERM後にプロセス終了を待機する最大時間(秒)
936

937
    """
938
    if not chrome_pids:
×
939
        return
×
940

941
    # 優雅な終了(SIGTERM)
942
    _send_signal_to_processes(chrome_pids, signal.SIGTERM, "SIGTERM")
×
943

944
    # プロセスの終了を待機(ポーリング)
945
    remaining_pids = list(chrome_pids)
×
946
    poll_interval = 0.2
×
947
    elapsed = 0.0
×
948

949
    while remaining_pids and elapsed < timeout:
×
950
        time.sleep(poll_interval)
×
951
        elapsed += poll_interval
×
952

953
        # まだ生存しているプロセスをチェック
954
        still_alive = []
×
955
        for pid in remaining_pids:
×
956
            try:
×
957
                if psutil.pid_exists(pid):
×
958
                    process = psutil.Process(pid)
×
959
                    if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
×
960
                        still_alive.append(pid)
×
961
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
962
                pass
×
963

964
        remaining_pids = still_alive
×
965

966
    # タイムアウト後もまだ残っているプロセスにのみ SIGKILL を送信
967
    if remaining_pids:
×
968
        logging.warning(
×
969
            "Chrome processes still alive after %.1fs, sending SIGKILL to %d processes",
970
            elapsed,
971
            len(remaining_pids),
972
        )
973
        _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
974

975

976
def _reap_single_process(pid: int) -> None:
1✔
977
    """単一プロセスをwaitpidで回収"""
978
    try:
×
979
        # ノンブロッキングでwaitpid
980
        result_pid, status = os.waitpid(pid, os.WNOHANG)
×
981
        if result_pid == pid:
×
982
            # プロセス名を取得
983
            try:
×
984
                process = psutil.Process(pid)
×
985
                process_name = process.name()
×
986
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
987
                process_name = "unknown"
×
988
            logging.debug("Reaped Chrome process: PID %d (%s)", pid, process_name)
×
989
    except (ChildProcessError, OSError):
×
990
        # 子プロセスでない場合や既に回収済みの場合は無視
991
        pass
×
992

993

994
def _reap_chrome_processes(chrome_pids: list[int]) -> None:
1✔
995
    """Chrome関連プロセスを明示的に回収してゾンビ化を防ぐ"""
996
    for pid in chrome_pids:
×
997
        _reap_single_process(pid)
×
998

999

1000
def _get_remaining_chrome_pids(chrome_pids: list[int]) -> list[int]:
1✔
1001
    """指定されたPIDリストから、まだ生存しているChrome関連プロセスを取得"""
1002
    remaining = []
×
1003
    for pid in chrome_pids:
×
1004
        try:
×
1005
            if psutil.pid_exists(pid):
×
1006
                process = psutil.Process(pid)
×
1007
                if (
×
1008
                    process.is_running()
1009
                    and process.status() != psutil.STATUS_ZOMBIE
1010
                    and _is_chrome_related_process(process)
1011
                ):
1012
                    remaining.append(pid)
×
1013
        except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1014
            pass
×
1015
    return remaining
×
1016

1017

1018
def _wait_for_processes_with_check(
1✔
1019
    chrome_pids: list[int],
1020
    timeout: float,
1021
    poll_interval: float = 0.2,
1022
    log_interval: float = 1.0,
1023
) -> list[int]:
1024
    """プロセスの終了を待機しつつ、残存プロセスをチェック
1025

1026
    Args:
1027
        chrome_pids: 監視対象のプロセスIDリスト
1028
        timeout: 最大待機時間(秒)
1029
        poll_interval: チェック間隔(秒)
1030
        log_interval: ログ出力間隔(秒)
1031

1032
    Returns:
1033
        タイムアウト後も残存しているプロセスIDのリスト
1034

1035
    """
1036
    elapsed = 0.0
×
1037
    last_log_time = 0.0
×
1038
    remaining_pids = list(chrome_pids)
×
1039

1040
    while remaining_pids and elapsed < timeout:
×
1041
        time.sleep(poll_interval)
×
1042
        elapsed += poll_interval
×
1043
        remaining_pids = _get_remaining_chrome_pids(remaining_pids)
×
1044

1045
        if remaining_pids and (elapsed - last_log_time) >= log_interval:
×
1046
            logging.info(
×
1047
                "Found %d remaining Chrome processes after %.0fs",
1048
                len(remaining_pids),
1049
                elapsed,
1050
            )
1051
            last_log_time = elapsed
×
1052

1053
    return remaining_pids
×
1054

1055

1056
def quit_driver_gracefully(
1✔
1057
    driver: WebDriver | None,
1058
    wait_sec: float = 5.0,
1059
    sigterm_wait_sec: float = 5.0,
1060
    sigkill_wait_sec: float = 5.0,
1061
) -> None:
1062
    """Chrome WebDriverを確実に終了する
1063

1064
    終了フロー:
1065
    1. driver.quit() を呼び出し
1066
    2. wait_sec 秒待機しつつプロセス終了をチェック
1067
    3. 残存プロセスがあれば SIGTERM を送信
1068
    4. sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1069
    5. 残存プロセスがあれば SIGKILL を送信
1070
    6. sigkill_wait_sec 秒待機
1071

1072
    Args:
1073
        driver: 終了する WebDriver インスタンス
1074
        wait_sec: quit 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1075
        sigterm_wait_sec: SIGTERM 後にプロセス終了を待機する秒数(デフォルト: 5秒)
1076
        sigkill_wait_sec: SIGKILL 後にプロセス回収を待機する秒数(デフォルト: 5秒)
1077

1078
    """
1079
    if driver is None:
1✔
1080
        return
1✔
1081

1082
    # quit前にChrome関連プロセスを記録
1083
    chrome_pids_before = _get_chrome_related_processes(driver)
1✔
1084

1085
    try:
1✔
1086
        # WebDriverの正常終了を試行(これがタブのクローズも含む)
1087
        driver.quit()
1✔
1088
        logging.info("WebDriver quit successfully")
1✔
1089
    except Exception:
1✔
1090
        logging.warning("Failed to quit driver normally", exc_info=True)
1✔
1091
    finally:
1092
        # undetected_chromedriver の __del__ がシャットダウン時に再度呼ばれるのを防ぐ
1093
        if hasattr(driver, "_has_quit"):
1✔
1094
            driver._has_quit = True  # type: ignore[attr-defined]
1✔
1095

1096
    # ChromeDriverサービスの停止を試行
1097
    try:
1✔
1098
        if hasattr(driver, "service") and driver.service and hasattr(driver.service, "stop"):  # type: ignore[union-attr]
1✔
NEW
1099
            driver.service.stop()  # type: ignore[union-attr]
×
UNCOV
1100
    except (ConnectionResetError, OSError):
×
1101
        # Chrome が既に終了している場合は無視
1102
        logging.debug("Chrome service already stopped")
×
1103
    except Exception:
×
1104
        logging.warning("Failed to stop Chrome service", exc_info=True)
×
1105

1106
    # Step 1: quit 後に wait_sec 秒待機しつつプロセス終了をチェック
1107
    remaining_pids = _wait_for_processes_with_check(chrome_pids_before, wait_sec)
1✔
1108

1109
    if not remaining_pids:
1✔
1110
        logging.debug("All Chrome processes exited normally")
1✔
1111
        return
1✔
1112

1113
    # Step 2: 残存プロセスに SIGTERM を送信
1114
    logging.info(
×
1115
        "Found %d remaining Chrome processes after %.0fs, sending SIGTERM",
1116
        len(remaining_pids),
1117
        wait_sec,
1118
    )
1119
    _send_signal_to_processes(remaining_pids, signal.SIGTERM, "SIGTERM")
×
1120

1121
    # Step 3: SIGTERM 後に sigterm_wait_sec 秒待機しつつプロセス終了をチェック
1122
    remaining_pids = _wait_for_processes_with_check(remaining_pids, sigterm_wait_sec)
×
1123

1124
    if not remaining_pids:
×
1125
        logging.info("All Chrome processes exited after SIGTERM")
×
1126
        _reap_chrome_processes(chrome_pids_before)
×
1127
        return
×
1128

1129
    # Step 4: 残存プロセスに SIGKILL を送信
1130
    logging.warning(
×
1131
        "Chrome processes still alive after SIGTERM + %.1fs, sending SIGKILL to %d processes",
1132
        sigterm_wait_sec,
1133
        len(remaining_pids),
1134
    )
1135
    _send_signal_to_processes(remaining_pids, signal.SIGKILL, "SIGKILL")
×
1136

1137
    # Step 5: SIGKILL 後に sigkill_wait_sec 秒待機してプロセス回収
1138
    time.sleep(sigkill_wait_sec)
×
1139
    _reap_chrome_processes(chrome_pids_before)
×
1140

1141
    # 最終チェック:まだ残っているプロセスがあるか確認
1142
    still_remaining = _get_remaining_chrome_pids(remaining_pids)
×
1143

1144
    # 回収できなかったプロセスについて警告
1145
    if still_remaining:
×
1146
        for pid in still_remaining:
×
1147
            try:
×
1148
                process = psutil.Process(pid)
×
1149
                logging.warning("Failed to collect Chrome-related process: PID %d (%s)", pid, process.name())
×
1150
            except (psutil.NoSuchProcess, psutil.AccessDenied):
×
1151
                pass
×
1152

1153

1154
if __name__ == "__main__":
1155
    import pathlib
1156

1157
    import docopt
1158
    import selenium.webdriver.support.wait
1159

1160
    import my_lib.config
1161
    import my_lib.logger
1162

1163
    assert __doc__ is not None  # noqa: S101
1164
    args = docopt.docopt(__doc__)
1165

1166
    config_file = args["-c"]
1167
    debug_mode = args["-D"]
1168

1169
    my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
1170

1171
    config = my_lib.config.load(config_file)
1172

1173
    driver = create_driver("test", pathlib.Path(config["data"]["selenium"]))
1174
    wait = selenium.webdriver.support.wait.WebDriverWait(driver, 5)
1175

1176
    driver.get("https://www.google.com/")
1177
    wait.until(
1178
        selenium.webdriver.support.expected_conditions.presence_of_element_located(
1179
            (selenium.webdriver.common.by.By.XPATH, '//input[contains(@value, "Google")]')
1180
        )
1181
    )
1182

1183
    quit_driver_gracefully(driver)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc