• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bramstroker / homeassistant-state-webhook / 12454045877

22 Dec 2024 12:03PM UTC coverage: 95.425% (+1.6%) from 93.836%
12454045877

Pull #11

github

bramstroker
feat: implement retry
Pull Request #11: feat: implement retry

11 of 11 new or added lines in 3 files covered. (100.0%)

7 existing lines in 1 file now uncovered.

146 of 153 relevant lines covered (95.42%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/custom_components/state_webhook/__init__.py
1
import asyncio
1✔
2
import fnmatch
1✔
3
import logging
1✔
4
from collections.abc import Mapping
1✔
5
from typing import Any
1✔
6

7
import aiohttp
1✔
8
import homeassistant.helpers.entity_registry as er
1✔
9
from homeassistant.config_entries import ConfigEntry
1✔
10
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
1✔
11
from homeassistant.core import Event, EventStateChangedData, HomeAssistant, State, callback
1✔
12
from homeassistant.helpers.event import async_track_state_change_event
1✔
13
from homeassistant.helpers.start import async_at_started
1✔
14

15
from .const import (
1✔
16
    CONF_ENTITY_DOMAIN,
17
    CONF_ENTITY_ID,
18
    CONF_ENTITY_ID_GLOB,
19
    CONF_ENTITY_LABELS,
20
    CONF_FILTER_MODE,
21
    CONF_PAYLOAD_ATTRIBUTES,
22
    CONF_PAYLOAD_OLD_STATE,
23
    CONF_WEBHOOK_AUTH_HEADER,
24
    CONF_WEBHOOK_HEADERS,
25
    CONF_WEBHOOK_URL,
26
    FilterMode,
27
)
28

29
_LOGGER = logging.getLogger(__name__)
1✔
30

31
RETRY_DELAY = 5
1✔
32

33

34
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
1✔
35
    async def _register_webhook(_: Any) -> None:  # noqa ANN401
1✔
36
        await register_webhook(hass, entry)
1✔
37

38
    async_at_started(hass, _register_webhook)
1✔
39

40
    return True
1✔
41

42

43
async def register_webhook(hass: HomeAssistant, entry: ConfigEntry) -> None:
1✔
44
    """Register webhook for state changes."""
45

46
    entities_to_track = await resolve_tracking_entities(hass, entry)
1✔
47
    if not entities_to_track:
1✔
48
        _LOGGER.warning("No entities found to track")
1✔
49
        return
1✔
50

51
    webhook_url = str(entry.options.get(CONF_WEBHOOK_URL))
1✔
52
    headers = prepare_headers(entry.options)
1✔
53

54
    _LOGGER.debug("Start webhook tracking using URL: %s", webhook_url)
1✔
55
    _LOGGER.debug("Tracking the following entities: %s", entities_to_track)
1✔
56

57
    # Create a single http session for reuse
58
    session = aiohttp.ClientSession()
1✔
59

60
    async def cleanup_session(_: Any) -> None:  # noqa ANN401
1✔
61
        """Cleanup the aiohttp session on shutdown."""
62
        await session.close()
1✔
63

64
    hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_session)
1✔
65

66
    @callback
1✔
67
    async def handle_state_change(event: Event[EventStateChangedData]) -> None:
1✔
68
        entity_id = event.data.get("entity_id")
1✔
69
        old_state = event.data.get("old_state")
1✔
70
        new_state = event.data.get("new_state")
1✔
71

72
        if new_state is None:
1✔
UNCOV
73
            return
×
74

75
        _LOGGER.debug(
1✔
76
            "State change detected for %s: %s -> %s",
77
            entity_id,
78
            old_state.state if old_state else "None",
79
            new_state.state,
80
        )
81

82
        result = False
1✔
83
        retry_count = 0
1✔
84
        while not result and retry_count < 3:
1✔
85
            result = await call_webhook(
1✔
86
                session,
87
                webhook_url,
88
                headers,
89
                build_payload(entry.options, entity_id, old_state, new_state),
90
            )
91
            retry_count += 1
1✔
92
            await asyncio.sleep(RETRY_DELAY)
1✔
93

94
    async_track_state_change_event(hass, entities_to_track, handle_state_change)
1✔
95

96
async def call_webhook(session: aiohttp.ClientSession, webhook_url: str, headers: Mapping[str, str], payload: dict[str, Any]) -> bool:
1✔
97
    """Call webhook with custom payload"""
98

99
    _LOGGER.debug("Calling webhook using URL: %s", webhook_url)
1✔
100

101
    try:
1✔
102
        async with session.post(webhook_url, json=payload, headers=headers) as response:
1✔
103
            if 200 <= response.status < 300:
1✔
104
                _LOGGER.debug("Webhook successfully called")
1✔
105
                return True
1✔
106
            _LOGGER.error("Webhook failed, HTTP status: %d", response.status)
1✔
UNCOV
107
    except Exception as e:  # noqa BLE001
×
UNCOV
108
        _LOGGER.error("Error calling webhook: %s", e)
×
109
    return False
1✔
110

111
def build_payload(options: Mapping[str, Any], entity_id: str, old_state: State | None, new_state: State) -> dict[str, Any]:
1✔
112
    """Build payload for webhook request"""
113
    payload = {
1✔
114
        "entity_id": entity_id,
115
        "time": new_state.last_updated.isoformat(),
116
        "new_state": new_state.state,
117
    }
118

119
    include_old_state = bool(options.get(CONF_PAYLOAD_OLD_STATE, True))
1✔
120
    if include_old_state and old_state:
1✔
121
        payload["old_state"] = old_state.state
1✔
122

123
    include_attributes = bool(options.get(CONF_PAYLOAD_ATTRIBUTES, False))
1✔
124
    if include_attributes:
1✔
125
        payload["new_state_attributes"] = new_state.attributes
1✔
126

127
    return payload
1✔
128

129
def prepare_headers(options: Mapping[str, Any]) -> dict[str, str]:
1✔
130
    """Prepare headers for webhook request"""
131
    headers = options.get(CONF_WEBHOOK_HEADERS) or {}
1✔
132
    auth_header = options.get(CONF_WEBHOOK_AUTH_HEADER)
1✔
133
    if auth_header:
1✔
UNCOV
134
        headers["Authorization"] = auth_header
×
135
    return headers
1✔
136

137
async def resolve_tracking_entities(hass: HomeAssistant, entry: ConfigEntry) -> set[str]:
1✔
138
    """Resolve entities to track based on conditions"""
139
    filter_mode: FilterMode = FilterMode(entry.options.get(CONF_FILTER_MODE, FilterMode.OR))
1✔
140

141
    entity_id_glob: str | None = entry.options.get(CONF_ENTITY_ID_GLOB)
1✔
142
    entity_ids: list[str] | None = entry.options.get(CONF_ENTITY_ID)
1✔
143
    domain: str | None = entry.options.get(CONF_ENTITY_DOMAIN)
1✔
144
    labels: list[str] | None = entry.options.get(CONF_ENTITY_LABELS)
1✔
145

146
    glob_entities = set(fnmatch.filter(hass.states.async_entity_ids(), entity_id_glob)) if entity_id_glob else set()
1✔
147
    id_entities = {entity_id for entity_id in hass.states.async_entity_ids() if entity_id in entity_ids} if entity_ids else set()
1✔
148
    domain_entities = set(hass.states.async_entity_ids(domain)) if domain else set()
1✔
149
    label_entities = set()
1✔
150

151
    if labels:
1✔
UNCOV
152
        entity_registry = er.async_get(hass)
×
UNCOV
153
        label_entities = {
×
154
            entity_id for entity_id, entity in entity_registry.entities.items()
155
            if entity.labels and any(label in entity.labels for label in labels)
156
        }
157

158
    all_results = [glob_entities, id_entities, domain_entities, label_entities]
1✔
159
    if filter_mode == FilterMode.AND:
1✔
UNCOV
160
        return set.intersection(*(res for res in all_results if res))
×
161

162
    return set.union(*all_results)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc