• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zigpy / zigpy-deconz / 4568100140

pending completion
4568100140

Pull #217

github

GitHub
Merge d472ee295 into 208d8371c
Pull Request #217: Fail with a descriptive error message if startup loops endlessly

6 of 7 new or added lines in 1 file covered. (85.71%)

992 of 1013 relevant lines covered (97.93%)

3.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.41
/zigpy_deconz/zigbee/application.py
1
"""ControllerApplication for deCONZ protocol based adapters."""
2

3
from __future__ import annotations
4✔
4

5
import asyncio
4✔
6
import logging
4✔
7
import re
4✔
8
from typing import Any
4✔
9

10
import zigpy.application
4✔
11
import zigpy.config
4✔
12
import zigpy.device
4✔
13
import zigpy.endpoint
4✔
14
import zigpy.exceptions
4✔
15
from zigpy.exceptions import FormationFailure, NetworkNotFormed
4✔
16
import zigpy.state
4✔
17
import zigpy.types
4✔
18
import zigpy.util
4✔
19
import zigpy.zdo.types as zdo_t
4✔
20

21
import zigpy_deconz
4✔
22
from zigpy_deconz import types as t
4✔
23
from zigpy_deconz.api import (
4✔
24
    Deconz,
25
    NetworkParameter,
26
    NetworkState,
27
    SecurityMode,
28
    Status,
29
    TXStatus,
30
)
31
from zigpy_deconz.config import CONF_WATCHDOG_TTL, CONFIG_SCHEMA, SCHEMA_DEVICE
4✔
32
import zigpy_deconz.exception
4✔
33

34
LOGGER = logging.getLogger(__name__)
4✔
35

36
CHANGE_NETWORK_WAIT = 1
4✔
37
DELAY_NEIGHBOUR_SCAN_S = 1500
4✔
38
SEND_CONFIRM_TIMEOUT = 60
4✔
39

40
PROTO_VER_MANUAL_SOURCE_ROUTE = 0x010C
4✔
41
PROTO_VER_WATCHDOG = 0x0108
4✔
42
PROTO_VER_NEIGBOURS = 0x0107
4✔
43

44

45
class ControllerApplication(zigpy.application.ControllerApplication):
4✔
46
    SCHEMA = CONFIG_SCHEMA
4✔
47
    SCHEMA_DEVICE = SCHEMA_DEVICE
4✔
48

49
    def __init__(self, config: dict[str, Any]):
4✔
50
        """Initialize instance."""
51

52
        super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config))
4✔
53
        self._api = None
4✔
54

55
        self._pending = zigpy.util.Requests()
4✔
56

57
        self.version = 0
4✔
58
        self._reset_watchdog_task = None
4✔
59
        self._reconnect_task = None
4✔
60

61
        self._written_endpoints = set()
4✔
62

63
    async def _reset_watchdog(self):
4✔
64
        while True:
2✔
65
            try:
4✔
66
                await self._api.write_parameter(
4✔
67
                    NetworkParameter.watchdog_ttl, self._config[CONF_WATCHDOG_TTL]
68
                )
69
            except Exception as e:
4✔
70
                LOGGER.warning("Failed to reset watchdog", exc_info=e)
4✔
71
                self.connection_lost(e)
4✔
72
                return
4✔
73

74
            await asyncio.sleep(self._config[CONF_WATCHDOG_TTL] * 0.75)
4✔
75

76
    async def connect(self):
4✔
77
        api = Deconz(self, self._config[zigpy.config.CONF_DEVICE])
4✔
78

79
        try:
4✔
80
            await api.connect()
4✔
81
            self.version = await api.version()
4✔
82
        except Exception:
4✔
83
            api.close()
4✔
84
            raise
4✔
85

86
        self._api = api
4✔
87
        self._written_endpoints.clear()
4✔
88

89
    def close(self):
4✔
90
        if self._reset_watchdog_task is not None:
4✔
91
            self._reset_watchdog_task.cancel()
4✔
92
            self._reset_watchdog_task = None
4✔
93

94
        if self._reconnect_task is not None:
4✔
95
            self._reconnect_task.cancel()
4✔
96
            self._reconnect_task = None
4✔
97

98
        if self._api is not None:
4✔
99
            self._api.close()
4✔
100
            self._api = None
4✔
101

102
    async def disconnect(self):
4✔
103
        self.close()
4✔
104

105
        if self._api is not None:
4✔
106
            self._api.close()
×
107

108
    async def permit_with_key(self, node: t.EUI64, code: bytes, time_s=60):
4✔
109
        raise NotImplementedError()
4✔
110

111
    async def start_network(self):
4✔
112
        await self.register_endpoints()
4✔
113
        await self.load_network_info(load_devices=False)
4✔
114
        await self._change_network_state(NetworkState.CONNECTED)
4✔
115

116
        coordinator = await DeconzDevice.new(
4✔
117
            self,
118
            self.state.node_info.ieee,
119
            self.state.node_info.nwk,
120
            self.version,
121
            self._config[zigpy.config.CONF_DEVICE][zigpy.config.CONF_DEVICE_PATH],
122
        )
123

124
        self.devices[self.state.node_info.ieee] = coordinator
4✔
125
        if self._api.protocol_version >= PROTO_VER_NEIGBOURS:
4✔
126
            await self.restore_neighbours()
4✔
127
        asyncio.create_task(self._delayed_neighbour_scan())
4✔
128

129
    async def _change_network_state(
4✔
130
        self, target_state: NetworkState, *, timeout: int = 10 * CHANGE_NETWORK_WAIT
131
    ):
132
        async def change_loop():
4✔
133
            while True:
2✔
134
                (state, _, _) = await self._api.device_state()
4✔
135
                if state.network_state == target_state:
4✔
136
                    break
4✔
137
                await asyncio.sleep(CHANGE_NETWORK_WAIT)
4✔
138

139
        await self._api.change_network_state(target_state)
4✔
140

141
        try:
4✔
142
            await asyncio.wait_for(change_loop(), timeout=timeout)
4✔
143
        except asyncio.TimeoutError:
4✔
144
            if target_state != NetworkState.CONNECTED:
4✔
NEW
145
                raise
×
146

147
            raise FormationFailure(
4✔
148
                "Network formation refused: there is likely too much RF interference."
149
                " Make sure your coordinator is on a USB 2.0 extension cable and"
150
                " away from any sources of interference, like USB 3.0 ports, SSDs,"
151
                " 2.4GHz routers, motherboards, etc."
152
            )
153

154
        if self._api.protocol_version < PROTO_VER_WATCHDOG:
4✔
155
            return
4✔
156

157
        if self._reset_watchdog_task is not None:
4✔
158
            self._reset_watchdog_task.cancel()
4✔
159

160
        if target_state == NetworkState.CONNECTED:
4✔
161
            self._reset_watchdog_task = asyncio.create_task(self._reset_watchdog())
4✔
162

163
    async def reset_network_info(self):
4✔
164
        # TODO: There does not appear to be a way to factory reset a Conbee
165
        await self.form_network()
4✔
166

167
    async def write_network_info(self, *, network_info, node_info):
4✔
168
        try:
4✔
169
            await self._api.write_parameter(
4✔
170
                NetworkParameter.nwk_frame_counter, network_info.network_key.tx_counter
171
            )
172
        except zigpy_deconz.exception.CommandError as ex:
4✔
173
            assert ex.status == Status.UNSUPPORTED
4✔
174
            LOGGER.warning(
4✔
175
                "Writing the network frame counter is not supported with this firmware,"
176
                " please update your Conbee"
177
            )
178

179
        if node_info.logical_type == zdo_t.LogicalType.Coordinator:
4✔
180
            await self._api.write_parameter(
4✔
181
                NetworkParameter.aps_designed_coordinator, 1
182
            )
183
        else:
184
            await self._api.write_parameter(
4✔
185
                NetworkParameter.aps_designed_coordinator, 0
186
            )
187

188
        await self._api.write_parameter(NetworkParameter.nwk_address, node_info.nwk)
4✔
189

190
        if node_info.ieee != zigpy.types.EUI64.UNKNOWN:
4✔
191
            # TODO: is there a way to revert it back to the hardware default? Or is this
192
            #       information lost when the parameter is overwritten?
193
            await self._api.write_parameter(
4✔
194
                NetworkParameter.mac_address, node_info.ieee
195
            )
196
            node_ieee = node_info.ieee
4✔
197
        else:
198
            (ieee,) = await self._api[NetworkParameter.mac_address]
×
199
            node_ieee = zigpy.types.EUI64(ieee)
×
200

201
        # There is no way to specify both a mask and the logical channel
202
        if network_info.channel is not None:
4✔
203
            channel_mask = zigpy.types.Channels.from_channel_list(
4✔
204
                [network_info.channel]
205
            )
206

207
            if network_info.channel_mask and channel_mask != network_info.channel_mask:
4✔
208
                LOGGER.warning(
4✔
209
                    "Channel mask %s will be replaced with current logical channel %s",
210
                    network_info.channel_mask,
211
                    channel_mask,
212
                )
213
        else:
214
            channel_mask = network_info.channel_mask
4✔
215

216
        await self._api.write_parameter(NetworkParameter.channel_mask, channel_mask)
4✔
217
        await self._api.write_parameter(NetworkParameter.use_predefined_nwk_panid, True)
4✔
218
        await self._api.write_parameter(NetworkParameter.nwk_panid, network_info.pan_id)
4✔
219
        await self._api.write_parameter(
4✔
220
            NetworkParameter.aps_extended_panid, network_info.extended_pan_id
221
        )
222
        await self._api.write_parameter(
4✔
223
            NetworkParameter.nwk_update_id, network_info.nwk_update_id
224
        )
225

226
        await self._api.write_parameter(
4✔
227
            NetworkParameter.network_key, 0, network_info.network_key.key
228
        )
229

230
        if network_info.network_key.seq != 0:
4✔
231
            LOGGER.warning(
4✔
232
                "Non-zero network key sequence number is not supported: %s",
233
                network_info.network_key.seq,
234
            )
235

236
        tc_link_key_partner_ieee = network_info.tc_link_key.partner_ieee
4✔
237

238
        if tc_link_key_partner_ieee == zigpy.types.EUI64.UNKNOWN:
4✔
239
            tc_link_key_partner_ieee = node_ieee
×
240

241
        await self._api.write_parameter(
4✔
242
            NetworkParameter.trust_center_address,
243
            tc_link_key_partner_ieee,
244
        )
245
        await self._api.write_parameter(
4✔
246
            NetworkParameter.link_key,
247
            tc_link_key_partner_ieee,
248
            network_info.tc_link_key.key,
249
        )
250

251
        if network_info.security_level == 0x00:
4✔
252
            await self._api.write_parameter(
4✔
253
                NetworkParameter.security_mode, SecurityMode.NO_SECURITY
254
            )
255
        else:
256
            await self._api.write_parameter(
4✔
257
                NetworkParameter.security_mode, SecurityMode.ONLY_TCLK
258
            )
259

260
        # Note: Changed network configuration parameters become only affective after
261
        # sending a Leave Network Request followed by a Create or Join Network Request
262
        await self._change_network_state(NetworkState.OFFLINE)
4✔
263
        await self._change_network_state(NetworkState.CONNECTED)
4✔
264

265
    async def load_network_info(self, *, load_devices=False):
4✔
266
        network_info = self.state.network_info
4✔
267
        node_info = self.state.node_info
4✔
268

269
        network_info.source = f"zigpy-deconz@{zigpy_deconz.__version__}"
4✔
270
        network_info.metadata = {
4✔
271
            "deconz": {
272
                "version": self.version,
273
            }
274
        }
275

276
        (ieee,) = await self._api[NetworkParameter.mac_address]
4✔
277
        node_info.ieee = zigpy.types.EUI64(ieee)
4✔
278
        (designed_coord,) = await self._api[NetworkParameter.aps_designed_coordinator]
4✔
279

280
        if designed_coord == 0x01:
4✔
281
            node_info.logical_type = zdo_t.LogicalType.Coordinator
4✔
282
        else:
283
            node_info.logical_type = zdo_t.LogicalType.Router
4✔
284

285
        (node_info.nwk,) = await self._api[NetworkParameter.nwk_address]
4✔
286

287
        (network_info.pan_id,) = await self._api[NetworkParameter.nwk_panid]
4✔
288
        (network_info.extended_pan_id,) = await self._api[
4✔
289
            NetworkParameter.aps_extended_panid
290
        ]
291

292
        if network_info.extended_pan_id == zigpy.types.EUI64.convert(
4✔
293
            "00:00:00:00:00:00:00:00"
294
        ):
295
            (network_info.extended_pan_id,) = await self._api[
4✔
296
                NetworkParameter.nwk_extended_panid
297
            ]
298

299
        (network_info.channel,) = await self._api[NetworkParameter.current_channel]
4✔
300
        (network_info.channel_mask,) = await self._api[NetworkParameter.channel_mask]
4✔
301
        (network_info.nwk_update_id,) = await self._api[NetworkParameter.nwk_update_id]
4✔
302

303
        if network_info.channel == 0:
4✔
304
            raise NetworkNotFormed("Network channel is zero")
4✔
305

306
        network_info.network_key = zigpy.state.Key()
4✔
307
        (
4✔
308
            _,
309
            network_info.network_key.key,
310
        ) = await self._api.read_parameter(NetworkParameter.network_key, 0)
311

312
        try:
4✔
313
            (network_info.network_key.tx_counter,) = await self._api[
4✔
314
                NetworkParameter.nwk_frame_counter
315
            ]
316
        except zigpy_deconz.exception.CommandError as ex:
4✔
317
            assert ex.status == Status.UNSUPPORTED
4✔
318

319
        network_info.tc_link_key = zigpy.state.Key()
4✔
320
        (network_info.tc_link_key.partner_ieee,) = await self._api[
4✔
321
            NetworkParameter.trust_center_address
322
        ]
323

324
        (_, network_info.tc_link_key.key) = await self._api.read_parameter(
4✔
325
            NetworkParameter.link_key,
326
            network_info.tc_link_key.partner_ieee,
327
        )
328

329
        (security_mode,) = await self._api[NetworkParameter.security_mode]
4✔
330

331
        if security_mode == SecurityMode.NO_SECURITY:
4✔
332
            network_info.security_level = 0x00
4✔
333
        elif security_mode == SecurityMode.ONLY_TCLK:
4✔
334
            network_info.security_level = 0x05
4✔
335
        else:
336
            LOGGER.warning("Unsupported security mode %r", security_mode)
4✔
337
            network_info.security_level = 0x05
4✔
338

339
    async def force_remove(self, dev):
4✔
340
        """Forcibly remove device from NCP."""
341
        pass
4✔
342

343
    async def energy_scan(
4✔
344
        self, channels: t.Channels.ALL_CHANNELS, duration_exp: int, count: int
345
    ) -> dict[int, float]:
346
        results = await super().energy_scan(
4✔
347
            channels=channels, duration_exp=duration_exp, count=count
348
        )
349

350
        # The Conbee seems to max out at an LQI of 85, which is exactly 255/3
351
        return {c: v * 3 for c, v in results.items()}
4✔
352

353
    async def add_endpoint(self, descriptor: zdo_t.SimpleDescriptor) -> None:
4✔
354
        """Register an endpoint on the device, replacing any with conflicting IDs."""
355

356
        endpoints = {}
4✔
357

358
        # Read and count the current endpoints. Some firmwares have three, others four.
359
        for index in range(255 + 1):
4✔
360
            try:
4✔
361
                _, current_descriptor = await self._api.read_parameter(
4✔
362
                    NetworkParameter.configure_endpoint, index
363
                )
364
            except zigpy_deconz.exception.CommandError as ex:
4✔
365
                assert ex.status == Status.UNSUPPORTED
4✔
366
                break
4✔
367
            else:
368
                endpoints[index] = current_descriptor
4✔
369

370
        LOGGER.debug("Got endpoint slots: %r", endpoints)
4✔
371

372
        # Don't write endpoints unnecessarily
373
        if descriptor in endpoints.values():
4✔
374
            LOGGER.debug("Endpoint already registered, skipping")
4✔
375

376
            # Pretend we wrote it
377
            self._written_endpoints.add(list(endpoints.values()).index(descriptor))
4✔
378
            return
4✔
379

380
        # Keep track of the best endpoint descriptor to replace
381
        target_index = None
4✔
382

383
        for index, current_descriptor in endpoints.items():
4✔
384
            # Ignore ones we've already written
385
            if index in self._written_endpoints:
4✔
386
                continue
4✔
387

388
            target_index = index
4✔
389

390
            if current_descriptor.endpoint == descriptor.endpoint:
4✔
391
                # Prefer to replace the endpoint with the same ID
392
                break
4✔
393

394
        if target_index is None:
4✔
395
            raise ValueError(f"No available endpoint slots exist: {endpoints!r}")
4✔
396

397
        LOGGER.debug("Writing %s to slot %r", descriptor, target_index)
4✔
398

399
        await self._api.write_parameter(
4✔
400
            NetworkParameter.configure_endpoint, target_index, descriptor
401
        )
402

403
    async def send_packet(self, packet):
4✔
404
        LOGGER.debug("Sending packet: %r", packet)
4✔
405

406
        tx_options = t.DeconzTransmitOptions.USE_NWK_KEY_SECURITY
4✔
407

408
        if (
4✔
409
            zigpy.types.TransmitOptions.ACK in packet.tx_options
410
            and packet.dst.addr_mode
411
            in (zigpy.types.AddrMode.NWK, zigpy.types.AddrMode.IEEE)
412
        ):
413
            tx_options |= t.DeconzTransmitOptions.USE_APS_ACKS
4✔
414

415
        async with self._limit_concurrency():
4✔
416
            req_id = self.get_sequence()
4✔
417

418
            with self._pending.new(req_id) as req:
4✔
419
                try:
4✔
420
                    await self._api.aps_data_request(
4✔
421
                        req_id=req_id,
422
                        dst_addr_ep=t.DeconzAddressEndpoint.from_zigpy_type(
423
                            packet.dst, packet.dst_ep or 0
424
                        ),
425
                        profile=packet.profile_id,
426
                        cluster=packet.cluster_id,
427
                        src_ep=min(1, packet.src_ep),
428
                        aps_payload=packet.data.serialize(),
429
                        tx_options=tx_options,
430
                        relays=packet.source_route,
431
                        radius=packet.radius or 0,
432
                    )
433
                except zigpy_deconz.exception.CommandError as ex:
4✔
434
                    raise zigpy.exceptions.DeliveryError(
4✔
435
                        f"Failed to enqueue packet: {ex!r}", ex.status
436
                    )
437

438
                status = await asyncio.wait_for(req.result, SEND_CONFIRM_TIMEOUT)
4✔
439

440
                if status != TXStatus.SUCCESS:
4✔
441
                    raise zigpy.exceptions.DeliveryError(
4✔
442
                        f"Failed to deliver packet: {status!r}", status
443
                    )
444

445
    def handle_rx(
4✔
446
        self, src, src_ep, dst, dst_ep, profile_id, cluster_id, data, lqi, rssi
447
    ):
448
        self.packet_received(
4✔
449
            zigpy.types.ZigbeePacket(
450
                src=src.as_zigpy_type(),
451
                src_ep=src_ep,
452
                dst=dst.as_zigpy_type(),
453
                dst_ep=dst_ep,
454
                tsn=None,
455
                profile_id=profile_id,
456
                cluster_id=cluster_id,
457
                data=zigpy.types.SerializableBytes(data),
458
                lqi=lqi,
459
                rssi=rssi,
460
            )
461
        )
462

463
    async def permit_ncp(self, time_s=60):
4✔
464
        assert 0 <= time_s <= 254
4✔
465
        await self._api.write_parameter(NetworkParameter.permit_join, time_s)
4✔
466

467
    def handle_tx_confirm(self, req_id, status):
4✔
468
        try:
4✔
469
            self._pending[req_id].result.set_result(status)
4✔
470
            return
4✔
471
        except KeyError:
4✔
472
            LOGGER.warning(
4✔
473
                "Unexpected transmit confirm for request id %s, Status: %s",
474
                req_id,
475
                status,
476
            )
477
        except asyncio.InvalidStateError as exc:
4✔
478
            LOGGER.debug(
4✔
479
                "Invalid state on future - probably duplicate response: %s", exc
480
            )
481

482
    async def restore_neighbours(self) -> None:
4✔
483
        """Restore children."""
484
        coord = self.get_device(ieee=self.state.node_info.ieee)
4✔
485

486
        for neighbor in self.topology.neighbors[coord.ieee]:
4✔
487
            try:
4✔
488
                device = self.get_device(ieee=neighbor.ieee)
4✔
489
            except KeyError:
4✔
490
                continue
4✔
491

492
            descr = device.node_desc
4✔
493
            LOGGER.debug(
4✔
494
                "device: 0x%04x - %s %s, FFD=%s, Rx_on_when_idle=%s",
495
                device.nwk,
496
                device.manufacturer,
497
                device.model,
498
                descr.is_full_function_device if descr is not None else None,
499
                descr.is_receiver_on_when_idle if descr is not None else None,
500
            )
501
            if (
4✔
502
                descr is None
503
                or descr.is_full_function_device
504
                or descr.is_receiver_on_when_idle
505
            ):
506
                continue
2✔
507
            LOGGER.debug(
4✔
508
                "Restoring %s/0x%04x device as direct child",
509
                device.ieee,
510
                device.nwk,
511
            )
512
            await self._api.add_neighbour(
4✔
513
                0x01, device.nwk, device.ieee, descr.mac_capability_flags
514
            )
515

516
    async def _delayed_neighbour_scan(self) -> None:
4✔
517
        """Scan coordinator's neighbours."""
518
        await asyncio.sleep(DELAY_NEIGHBOUR_SCAN_S)
4✔
519
        coord = self.get_device(ieee=self.state.node_info.ieee)
4✔
520
        await self.topology.scan(devices=[coord])
4✔
521

522
    def connection_lost(self, exc: Exception) -> None:
4✔
523
        """Lost connection."""
524

525
        if exc is not None:
4✔
526
            LOGGER.warning("Lost connection: %r", exc)
4✔
527

528
        self.close()
4✔
529
        self._reconnect_task = asyncio.create_task(self._reconnect_loop())
4✔
530

531
    async def _reconnect_loop(self) -> None:
4✔
532
        attempt = 1
4✔
533

534
        while True:
2✔
535
            LOGGER.debug("Reconnecting, attempt %s", attempt)
4✔
536

537
            try:
4✔
538
                await asyncio.wait_for(self.connect(), timeout=10)
4✔
539
                await asyncio.wait_for(self.initialize(), timeout=10)
4✔
540
                break
4✔
541
            except Exception as exc:
4✔
542
                wait = 2 ** min(attempt, 5)
4✔
543
                attempt += 1
4✔
544
                LOGGER.debug(
4✔
545
                    "Couldn't re-open '%s' serial port, retrying in %ss: %s",
546
                    self._config[zigpy.config.CONF_DEVICE][
547
                        zigpy.config.CONF_DEVICE_PATH
548
                    ],
549
                    wait,
550
                    str(exc),
551
                    exc_info=exc,
552
                )
553
                await asyncio.sleep(wait)
4✔
554

555
        LOGGER.debug(
4✔
556
            "Reconnected '%s' serial port after %s attempts",
557
            self._config[zigpy.config.CONF_DEVICE][zigpy.config.CONF_DEVICE_PATH],
558
            attempt,
559
        )
560

561

562
class DeconzDevice(zigpy.device.Device):
4✔
563
    """Zigpy Device representing Coordinator."""
564

565
    def __init__(self, version: int, device_path: str, *args):
4✔
566
        """Initialize instance."""
567

568
        super().__init__(*args)
4✔
569
        is_gpio_device = re.match(r"/dev/tty(S|AMA|ACM)\d+", device_path)
4✔
570
        self._model = "RaspBee" if is_gpio_device else "ConBee"
4✔
571
        self._model += " II" if ((version & 0x0000FF00) == 0x00000700) else ""
4✔
572

573
    async def add_to_group(self, grp_id: int, name: str = None) -> None:
4✔
574
        group = self.application.groups.add_group(grp_id, name)
4✔
575

576
        for epid in self.endpoints:
4✔
577
            if not epid:
4✔
578
                continue  # skip ZDO
4✔
579
            group.add_member(self.endpoints[epid])
4✔
580
        return [0]
4✔
581

582
    async def remove_from_group(self, grp_id: int) -> None:
4✔
583
        for epid in self.endpoints:
4✔
584
            if not epid:
4✔
585
                continue  # skip ZDO
4✔
586
            self.application.groups[grp_id].remove_member(self.endpoints[epid])
4✔
587
        return [0]
4✔
588

589
    @property
4✔
590
    def manufacturer(self):
4✔
591
        return "dresden elektronik"
4✔
592

593
    @property
4✔
594
    def model(self):
4✔
595
        return self._model
4✔
596

597
    @classmethod
4✔
598
    async def new(cls, application, ieee, nwk, version: int, device_path: str):
4✔
599
        """Create or replace zigpy device."""
600
        dev = cls(version, device_path, application, ieee, nwk)
4✔
601

602
        if ieee in application.devices:
4✔
603
            from_dev = application.get_device(ieee=ieee)
4✔
604
            dev.status = from_dev.status
4✔
605
            dev.node_desc = from_dev.node_desc
4✔
606
            for ep_id, from_ep in from_dev.endpoints.items():
4✔
607
                if not ep_id:
4✔
608
                    continue  # Skip ZDO
4✔
609
                ep = dev.add_endpoint(ep_id)
4✔
610
                ep.profile_id = from_ep.profile_id
4✔
611
                ep.device_type = from_ep.device_type
4✔
612
                ep.status = from_ep.status
4✔
613
                ep.in_clusters = from_ep.in_clusters
4✔
614
                ep.out_clusters = from_ep.out_clusters
4✔
615
        else:
616
            application.devices[ieee] = dev
4✔
617
            await dev.initialize()
4✔
618

619
        return dev
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc