• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bcpearce / Amcrest-API / 19658816921

25 Nov 2025 05:01AM UTC coverage: 82.482% (+0.2%) from 82.249%
19658816921

Pull #18

github

web-flow
Merge 21446e6c1 into 45d7f26d9
Pull Request #18: Add privacy mode capability

565 of 685 relevant lines covered (82.48%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.55
amcrest_api/camera.py
1
"""Amcrest Camera"""
2

3
import asyncio
1✔
4
from collections.abc import (  # pylint: disable=import-error
1✔
5
    AsyncGenerator,
6
    Awaitable,
7
)
8
from datetime import datetime, timedelta
1✔
9
from ssl import SSLContext
1✔
10
from typing import Any
1✔
11

12
from httpx import AsyncClient, DigestAuth, HTTPStatusError, Request, Response
1✔
13
from yarl import URL
1✔
14

15
from amcrest_api.error import UnsupportedStreamSubtype
1✔
16

17
from . import utils
1✔
18
from .config import Config
1✔
19
from .const import STREAM_TYPE_DICT, ApiEndpoints, StreamType
1✔
20
from .event import EventBase, EventMessageData, parse_event_message
1✔
21
from .imaging import ConfigNo, Lighting, VideoDayNight, VideoImageControl
1✔
22
from .ptz import (
1✔
23
    PtzAccuratePosition,
24
    PtzBasicMove,
25
    PtzCapabilityData,
26
    PtzPresetData,
27
    PtzRelativeMove,
28
    PtzStatusData,
29
)
30
from .storage import StorageDeviceInfo
1✔
31

32

33
class Camera:
1✔
34
    """Class for an Amcrest camera implementing the API."""
35

36
    _ptz_capabilities: PtzCapabilityData | None = None
1✔
37
    _client: AsyncClient | None = None
1✔
38
    _fixed_config: Config | None = None
1✔
39

40
    def __init__(
1✔
41
        self,
42
        host: str,
43
        username: str,
44
        password: str,
45
        *,
46
        port: int = 80,
47
        scheme: str = "http",
48
        verify: bool | SSLContext = True,
49
    ) -> None:
50
        self._username = username
1✔
51
        self._password = password
1✔
52
        self._scheme = scheme
1✔
53
        self._host = host
1✔
54
        self._port = port
1✔
55
        self._verify = verify
1✔
56

57
    async def async_get_fixed_config(self) -> Config:
1✔
58
        """Read a number of properties that should be cached for the API session."""
59
        if self._fixed_config is None:
1✔
60
            config: dict[str, Any] = {}
1✔
61
            config["device_type"] = await self.async_device_type
1✔
62
            config["hardware_version"] = await self.async_hardware_version
1✔
63
            config["machine_name"] = await self.async_machine_name
1✔
64
            config["max_extra_stream"] = await self.async_max_extra_stream
1✔
65
            config["network"] = (await self.async_network_config)["Network"]
1✔
66
            config["ptz_capabilities"] = await self.async_ptz_capabilities
1✔
67
            config["serial_number"] = await self.async_serial_number
1✔
68
            config["software_version"] = await self.async_software_version
1✔
69
            config["supported_events"] = await self.async_supported_events
1✔
70
            config["supported_streams"] = {
1✔
71
                k: v
72
                for k, v in STREAM_TYPE_DICT.items()
73
                if k <= config["max_extra_stream"]
74
            }
75
            config["privacy_mode_available"] = await self.async_privacy_mode_available
1✔
76

77
            for _, value in config["network"].items():
1✔
78
                if isinstance(value, dict) and value.get("IPAddress") == self._host:
1✔
79
                    config["session_physical_address"] = value["PhysicalAddress"]
1✔
80
            self._fixed_config = Config(**config)
1✔
81
        return self._fixed_config
1✔
82

83
    @property
1✔
84
    def url(self) -> URL:
1✔
85
        """Provide the URL for accessing the web interface."""
86
        return URL.build(scheme=self._scheme, host=self._host, port=self._port)
1✔
87

88
    async def async_get_rtsp_url(
1✔
89
        self, *, channel: int = 1, subtype: int = StreamType.MAIN
90
    ) -> URL | None:
91
        """
92
        Returns the streaming URL including credentials.
93
        ***Warning*** this will be in plaintext instead of digest form.
94
        This is not cached as the RTSP port can be reconfigured during a session.
95
        """
96
        if subtype > (
1✔
97
            max_subtream := (await self.async_get_fixed_config()).max_extra_stream
98
        ):
99
            raise UnsupportedStreamSubtype(
1✔
100
                f"Camera does not support substream {subtype}, \
101
                max substream is {max_subtream}"
102
            )
103
        rtsp_conf: dict = (
1✔
104
            await self._async_api_request(
105
                ApiEndpoints.CONFIG_MANAGER,
106
                params={"action": "getConfig", "name": "RTSP"},
107
            )
108
        )["RTSP"]
109
        if rtsp_conf["Enable"] == "false":
1✔
110
            return None
×
111
        return URL.build(
1✔
112
            scheme="rtsp",
113
            user=self._username,
114
            password=self._password,
115
            host=self._host,
116
            port=int(rtsp_conf["Port"]),
117
            path=ApiEndpoints.REALTIME_STREAM,
118
            query={"channel": channel, "subtype": subtype},
119
        )
120

121
    async def async_listen_events(
1✔
122
        self,
123
        *,
124
        heartbeat_seconds: int = 10,
125
        filter_events: list[str] | None = None,
126
    ) -> AsyncGenerator[EventBase | None]:
127
        """
128
        Asynchronously listen to events.
129

130
        Args:
131
            heartbeat_seconds (int):
132
                an interval to request heartbeats to keep the connection alive
133
            filter_events (list[EventMessageTypes]|None):
134
                a list of events to listen to, or None for all capabilities
135
        """
136
        filter_events = filter_events or await self.async_supported_events
×
137
        filter_events_param = f"[{','.join(filter_events)}]"  # type: ignore[arg-type]
×
138

139
        async with (
×
140
            self._create_async_client(timeout=heartbeat_seconds * 2) as client,
141
            client.stream(
142
                "GET",
143
                ApiEndpoints.EVENT_MANAGER,
144
                params={
145
                    "action": "attach",
146
                    "codes": filter_events_param,
147
                    "heartbeat": heartbeat_seconds,
148
                },  # noqa: E501
149
            ) as stream,
150
        ):
151
            i = 0
×
152
            try:
×
153
                async for txt in stream.aiter_text():
×
154
                    event_message = EventMessageData(txt)
×
155
                    i += 1
×
156
                    yield parse_event_message(str(event_message.content))
×
157
            finally:
158
                await stream.aclose()
×
159

160
    @property
1✔
161
    async def async_serial_number(self):
1✔
162
        """Get serial number."""
163
        return (
1✔
164
            await self._async_api_request(
165
                ApiEndpoints.MAGIC_BOX, params={"action": "getSerialNo"}
166
            )
167
        )["sn"]
168

169
    @property
1✔
170
    async def async_device_type(self):
1✔
171
        """Get device type/model name."""
172
        return (
1✔
173
            await self._async_api_request(
174
                ApiEndpoints.MAGIC_BOX, params={"action": "getDeviceType"}
175
            )
176
        )["type"]
177

178
    @property
1✔
179
    async def async_hardware_version(self):
1✔
180
        """Get hardware version."""
181
        return (
1✔
182
            await self._async_api_request(
183
                ApiEndpoints.MAGIC_BOX, params={"action": "getHardwareVersion"}
184
            )
185
        )["version"]
186

187
    @property
1✔
188
    async def async_max_extra_stream(self):
1✔
189
        """Get max extra streams."""
190
        return int(
1✔
191
            (
192
                await self._async_api_request(
193
                    ApiEndpoints.MAGIC_BOX,
194
                    params={"action": "getProductDefinition", "name": "MaxExtraStream"},
195
                )
196
            )["MaxExtraStream"]
197
        )
198

199
    @property
1✔
200
    async def async_general_config(self):
1✔
201
        """Get general config."""
202
        return await self._async_api_request(
1✔
203
            ApiEndpoints.CONFIG_MANAGER,
204
            params={"action": "getConfig", "name": "General"},
205
        )
206

207
    @property
1✔
208
    async def async_network_config(self):
1✔
209
        """Get network config."""
210
        return await self._async_api_request(
1✔
211
            ApiEndpoints.CONFIG_MANAGER,
212
            params={"action": "getConfig", "name": "Network"},
213
        )
214

215
    @property
1✔
216
    async def async_software_version(self):
1✔
217
        """Get software version."""
218
        return (
1✔
219
            await self._async_api_request(
220
                ApiEndpoints.MAGIC_BOX, params={"action": "getSoftwareVersion"}
221
            )
222
        )["version"]
223

224
    @property
1✔
225
    async def async_machine_name(self) -> str:
1✔
226
        """Get machine name."""
227
        return (await self.async_general_config)["General"]["MachineName"]
1✔
228

229
    @property
1✔
230
    async def async_snap_config(self):
1✔
231
        """Get snap config."""
232
        return await self._async_api_request(
×
233
            ApiEndpoints.CONFIG_MANAGER,
234
            params={"action": "getConfig", "name": "Snap"},
235
        )
236

237
    @property
1✔
238
    async def async_lighting_config(self) -> list[list[list[Lighting]]]:
1✔
239
        """Get lighting config."""
240
        return Lighting.create_from_response(
1✔
241
            await self._async_api_request(
242
                ApiEndpoints.CONFIG_MANAGER,
243
                params={"action": "getConfig", "name": "Lighting_V2"},
244
            )
245
        )
246

247
    async def async_set_lighting_config(
1✔
248
        self, config_no, light: Lighting, index: int = 0, channel: int = 1
249
    ) -> None:
250
        """Set lighting config."""
251
        params: dict[str, Any] = {"action": "setConfig"}
×
252
        if light.middle_light:
×
253
            params[
×
254
                f"Lighting_V2[{channel - 1}][{config_no}][{index}].MiddleLight[0].Light"
255
            ] = light.middle_light.light
256
            params[
×
257
                f"Lighting_V2[{channel - 1}][{config_no}][{index}].MiddleLight[0].Angle"
258
            ] = light.middle_light.angle
259
        if light.correction:
×
260
            params[f"Lighting_V2[{channel - 1}][{config_no}][{index}].Correction"] = (
×
261
                light.correction
262
            )
263
        if light.mode:
×
264
            params[f"Lighting_V2[{channel - 1}][{config_no}][{index}].Mode"] = (
×
265
                light.mode
266
            )
267
        if light.sensitivity:
×
268
            params[f"Lighting_V2[{channel - 1}][{config_no}][{index}].Sensitive"] = (
×
269
                light.sensitivity
270
            )
271
        await self._async_api_request(ApiEndpoints.CONFIG_MANAGER, params=params)
×
272

273
    @property
1✔
274
    async def async_encode_capability(self) -> Awaitable[dict[str, Any]]:
1✔
275
        """Get encoding capabilities."""
276
        return await self._async_api_request(
×
277
            ApiEndpoints.ENCODE, params={"action": "getCaps"}
278
        )
279

280
    @property
1✔
281
    async def async_supported_events(self) -> list[str]:
1✔
282
        """Get a list of supported events."""
283
        response_content = await self._async_api_request(
1✔
284
            ApiEndpoints.EVENT_MANAGER, params={"action": "getExposureEvents"}
285
        )
286
        return utils.indexed_dict_to_list(response_content["events"])
1✔
287

288
    @property
1✔
289
    async def async_ptz_preset_info(
1✔
290
        self,
291
        channel: int = 1,
292
    ) -> list[PtzPresetData]:
293
        """Asynchronously get the preset information."""
294
        response_content = await self._async_api_request(
1✔
295
            ApiEndpoints.PTZ,
296
            params={"action": "getPresets", "channel": channel},
297
        )
298
        return [
1✔
299
            PtzPresetData(index=preset["Index"], name=preset["Name"])
300
            for preset in utils.indexed_dict_to_list(
301
                response_content.get("presets", dict())
302
            )
303
        ]
304

305
    async def async_set_ptz_preset(
1✔
306
        self, preset: PtzPresetData, channel: int = 1
307
    ) -> None:
308
        """Asynchronously save the current position as a preset."""
309
        await self._async_api_request(
×
310
            ApiEndpoints.PTZ,
311
            params={
312
                "action": "start",
313
                "code": "SetPreset",
314
                "channel": channel,
315
                "arg1": 0,
316
                "arg2": preset.index,
317
                "arg3": 0,
318
            },
319
        )
320
        await self._async_api_request(
×
321
            ApiEndpoints.PTZ,
322
            params={
323
                "action": "setPreset",
324
                "channel": channel,
325
                "arg1": preset.index,
326
                "arg2": preset.name,
327
            },
328
        )
329

330
    async def async_clear_ptz_preset(
1✔
331
        self, preset: PtzPresetData | int, channel: int = 1
332
    ) -> None:
333
        """Asynchronously delete a preset."""
334
        index = preset.index if isinstance(preset, PtzPresetData) else preset
×
335
        await self._async_api_request(
×
336
            ApiEndpoints.PTZ,
337
            params={
338
                "action": "start",
339
                "code": "ClearPreset",
340
                "channel": channel,
341
                "arg1": 0,
342
                "arg2": index,
343
                "arg3": 0,
344
            },
345
        )
346

347
    async def async_ptz_move_to_preset(
1✔
348
        self, preset_number: int, channel: int = 1
349
    ) -> None:
350
        """Asynchronously move to a preset."""
351
        return await self._async_api_request(
×
352
            ApiEndpoints.PTZ,
353
            params={
354
                "action": "start",
355
                "code": "GotoPreset",
356
                "channel": channel,
357
                "arg1": 0,
358
                "arg2": preset_number,
359
                "arg3": 0,
360
            },
361
        )
362

363
    async def async_ptz_move_relative(self, relative_move: PtzRelativeMove) -> None:
1✔
364
        """Move the PTZ control relatively."""
365
        if relative_move.caps is None:
×
366
            relative_move.caps = await self.async_ptz_capabilities
×
367
        await self._async_api_request(
×
368
            ApiEndpoints.PTZ, params=relative_move.get_query_dict()
369
        )
370

371
    async def async_ptz_move_absolute(self, absolute_move: PtzAccuratePosition) -> None:
1✔
372
        """Move the PTZ control to absolute position."""
373
        if absolute_move.caps is None:
×
374
            absolute_move.caps = await self.async_ptz_capabilities
×
375
        await self._async_api_request(
×
376
            ApiEndpoints.PTZ, params=absolute_move.get_query_dict()
377
        )
378

379
    async def async_ptz_move(
1✔
380
        self, continuous_move: PtzBasicMove, delay_till_stop: timedelta | None = None
381
    ) -> None:
382
        """Begin PTZ Move."""
383
        if continuous_move.caps is None:
×
384
            continuous_move.caps = await self.async_ptz_capabilities
×
385
        await self._async_api_request(
×
386
            ApiEndpoints.PTZ, params=continuous_move.get_start_query_dict()
387
        )
388
        if delay_till_stop is not None:
×
389
            await asyncio.sleep(delay_till_stop.total_seconds())
×
390
            await self.async_ptz_stop(continuous_move)
×
391

392
    async def async_ptz_stop(self, continuous_move: PtzBasicMove) -> None:
1✔
393
        """Stop PTZ Move."""
394
        if continuous_move.caps is None:
×
395
            continuous_move.caps = await self.async_ptz_capabilities
×
396
        await self._async_api_request(
×
397
            ApiEndpoints.PTZ, params=continuous_move.get_stop_query_dict()
398
        )
399

400
    @property
1✔
401
    async def async_ptz_capabilities(self, channel: int = 1) -> PtzCapabilityData:
1✔
402
        """Get PTZ capabilities."""
403
        self._ptz_capabilities = (
1✔
404
            self._ptz_capabilities
405
            or PtzCapabilityData.create_from_response(
406
                await self._async_api_request(
407
                    ApiEndpoints.PTZ,
408
                    params={"action": "getCurrentProtocolCaps", "channel": channel},
409
                )
410
            )
411
        )
412
        return self._ptz_capabilities
1✔
413

414
    @property
1✔
415
    async def async_ptz_status(self, channel: int = 1) -> PtzStatusData:
1✔
416
        """Get PTZ status."""
417
        return PtzStatusData.create_from_response(
1✔
418
            await self._async_api_request(
419
                ApiEndpoints.PTZ, params={"action": "getStatus", "channel": channel}
420
            )
421
        )
422

423
    @property
1✔
424
    async def async_storage_list(self) -> list[str]:
1✔
425
        """Get list of storage device paths."""
426
        res = await self._async_api_request(
1✔
427
            ApiEndpoints.STORAGE_DEVICE, params={"action": "factory.getCollect"}
428
        )
429
        return utils.indexed_dict_to_list(res.get("list", {}))
1✔
430

431
    @property
1✔
432
    async def async_storage_info(self) -> list[StorageDeviceInfo]:
1✔
433
        """Get storage device info."""
434
        if len(await self.async_storage_list) > 0:
1✔
435
            return StorageDeviceInfo.create_from_response(
1✔
436
                await self._async_api_request(
437
                    ApiEndpoints.STORAGE_DEVICE,
438
                    params={
439
                        "action": "getDeviceAllInfo",
440
                    },
441
                )
442
            )
443
        return list()
1✔
444

445
    @property
1✔
446
    async def async_video_image_control(self) -> list[VideoImageControl]:
1✔
447
        """Get flip, mirror, and rotate settings."""
448
        return VideoImageControl.create_from_response(
1✔
449
            await self._async_api_request(
450
                ApiEndpoints.CONFIG_MANAGER,
451
                params={"action": "getConfig", "name": "VideoImageControl"},
452
            )
453
        )
454

455
    async def async_set_video_image_control(
1✔
456
        self, video_image_control: VideoImageControl, channel: int = 1
457
    ) -> None:
458
        """Set image control settings."""
459
        await self._async_api_request(
×
460
            ApiEndpoints.CONFIG_MANAGER,
461
            params={
462
                "action": "setConfig",
463
                f"VideoImageControl[{channel - 1}].Flip": video_image_control.flip,
464
                f"VideoImageControl[{channel - 1}].Mirror": video_image_control.mirror,
465
                f"VideoImageControl[{channel - 1}].Rotate90": video_image_control.rotate_90,  # noqa: E501
466
            },
467
        )
468

469
    async def async_get_video_in_day_night(
1✔
470
        self,
471
    ) -> list[list[VideoDayNight]]:
472
        """Video input day/night settings."""
473
        return VideoDayNight.create_from_response(
1✔
474
            await self._async_api_request(
475
                ApiEndpoints.CONFIG_MANAGER,
476
                params={"action": "getConfig", "name": "VideoInDayNight"},
477
            )
478
        )
479

480
    async def async_set_video_in_day_night(
1✔
481
        self,
482
        video_day_night: VideoDayNight,
483
        config_no: ConfigNo,
484
        channel: int = 1,
485
    ) -> None:
486
        """Set video input day/night settings for a config and channel."""
487
        await self._async_api_request(
×
488
            ApiEndpoints.CONFIG_MANAGER,
489
            params={
490
                "action": "setConfig",
491
                f"VideoInDayNight[{channel - 1}][{config_no}].Type": video_day_night.type,  # noqa: E501
492
                f"VideoInDayNight[{channel - 1}][{config_no}].Mode": video_day_night.mode,  # noqa: E501
493
                f"VideoInDayNight[{channel - 1}][{config_no}].Sensitivity": video_day_night.sensitivity,  # noqa: E501
494
                f"VideoInDayNight[{channel - 1}][{config_no}].Delay": video_day_night.delay_seconds,  # noqa: E501
495
            },
496
        )
497

498
    @property
1✔
499
    async def async_privacy_mode_available(self) -> bool:
1✔
500
        """Get privacy mode capability."""
501
        try:
1✔
502
            await self.async_get_privacy_mode_on()
1✔
503
            return True
1✔
504
        except HTTPStatusError:
1✔
505
            return False
1✔
506

507
    async def async_set_privacy_mode_on(self, on: bool, channel: int = 1) -> None:
1✔
508
        """Set privacy mode on or off."""
509
        await self._async_api_request(
×
510
            ApiEndpoints.CONFIG_MANAGER,
511
            params={"action": "setConfig", f"LeLensMask[{channel - 1}].Enable": on},
512
        )
513

514
    async def async_get_privacy_mode_on(self) -> bool:
1✔
515
        """Get privacy mode state."""
516
        response = await self._async_api_request(
1✔
517
            ApiEndpoints.CONFIG_MANAGER,
518
            params={"action": "getConfig", "name": "LeLensMask"},
519
        )
520
        if (enabled := response["LeLensMask"][0]["Enable"].lower()) in [
1✔
521
            "true",
522
            "false",
523
        ]:
524
            return enabled == "true"
1✔
525
        raise ValueError("Unexpected response reading privacy mode status")
×
526

527
    async def async_set_smart_track_on(self, on: bool) -> None:
1✔
528
        """Set smart tracking mode on or off."""
529
        await self._async_api_request(
×
530
            ApiEndpoints.CONFIG_MANAGER,
531
            params={"action": "setConfig", "LeSmartTrack[0].Enable": on},
532
        )
533

534
    async def async_get_smart_track_on(self) -> bool:
1✔
535
        """Get privacy mode state."""
536
        response = await self._async_api_request(
1✔
537
            ApiEndpoints.CONFIG_MANAGER,
538
            params={"action": "getConfig", "name": "LeSmartTrack"},
539
        )
540
        if (enabled := response["LeSmartTrack"][0]["Enable"].lower()) in [
1✔
541
            "true",
542
            "false",
543
        ]:
544
            return enabled == "true"
1✔
545
        raise ValueError("Unexpected response reading smart track status")
×
546

547
    async def async_snapshot(self, channel: int = 1, subtype: int = 0) -> bytes:
1✔
548
        """Get a still frame from the camera."""
549
        response: bytes = await self._async_api_request(
×
550
            ApiEndpoints.SNAPSHOT, params={"channel": channel, "type": subtype}
551
        )
552
        return response
×
553

554
    async def async_get_current_time(self) -> datetime:
1✔
555
        """Get the current time from the camera."""
556
        response = await self._async_api_request(
1✔
557
            ApiEndpoints.GLOBAL, params={"action": "getCurrentTime"}
558
        )
559
        return datetime.strptime(response["result"], "%Y-%m-%d %H:%M:%S")
1✔
560

561
    async def async_set_current_time(self, set_time: datetime | None = None) -> None:
1✔
562
        """
563
        Set the current time for the camera.
564
        If set_time is not specified, use current time from system.
565
        """
566
        if set_time is None:
×
567
            set_time = datetime.now()
×
568
        set_time_str: str = set_time.strftime("%Y-%m-%d %H:%M:%S")
×
569
        await self._async_api_request(
×
570
            ApiEndpoints.GLOBAL,
571
            params={"action": "setCurrentTime", "time": set_time_str},
572
        )
573

574
    async def aclose_client(self) -> None:
1✔
575
        """
576
        Close the client.
577

578
        Always call this when wrapping up use of the class
579
        or use the asynchronous context manager.
580
        """
581
        if self._client is not None and not self._client.is_closed:
1✔
582
            await self._client.aclose()
1✔
583
            self._client = None
1✔
584

585
    def _create_async_client(self, **kwargs):
1✔
586
        return AsyncClient(
1✔
587
            auth=DigestAuth(self._username, self._password),
588
            base_url=str(self.url),
589
            verify=self._verify,
590
            **kwargs,
591
        )
592

593
    async def _async_api_request(
1✔
594
        self, endpoint, *, method="GET", params: dict[str, Any] | None = None
595
    ):
596
        # build the client lazily if it does not exist
597
        self._client = self._client or self._create_async_client()
1✔
598
        request: Request = self._client.build_request(
1✔
599
            method=method, url=endpoint, params=params
600
        )
601
        response: Response = await self._client.send(request=request)
1✔
602
        response.raise_for_status()
1✔
603
        return utils.parse_response(response)
1✔
604

605
    async def __aenter__(self):
1✔
606
        self._client = self._create_async_client()
1✔
607
        return self
1✔
608

609
    async def __aexit__(self, exc_type, exc_value, traceback):
1✔
610
        await self.aclose_client()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc