• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

17 Mar 2025 11:44PM UTC coverage: 86.954% (+0.02%) from 86.93%
7ae8c08b-27e7-4df6-bfb9-4892ae974ff7

push

circleci

web-flow
SNS: fix Filter Policy engine to not evaluate full complex payload (#12395)

8 of 8 new or added lines in 1 file covered. (100.0%)

27 existing lines in 8 files now uncovered.

62326 of 71677 relevant lines covered (86.95%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.66
/localstack-core/localstack/services/lambda_/invocation/logs.py
1
import dataclasses
1✔
2
import logging
1✔
3
import threading
1✔
4
from queue import Queue
1✔
5
from typing import Optional, Union
1✔
6

7
from localstack.aws.connect import connect_to
1✔
8
from localstack.utils.aws.client_types import ServicePrincipal
1✔
9
from localstack.utils.bootstrap import is_api_enabled
1✔
10
from localstack.utils.cloudwatch.cloudwatch_util import store_cloudwatch_logs
1✔
11
from localstack.utils.threads import FuncThread
1✔
12

13
LOG = logging.getLogger(__name__)
1✔
14

15

16
class ShutdownPill:
1✔
17
    pass
1✔
18

19

20
QUEUE_SHUTDOWN = ShutdownPill()
1✔
21

22

23
@dataclasses.dataclass(frozen=True)
1✔
24
class LogItem:
1✔
25
    log_group: str
1✔
26
    log_stream: str
1✔
27
    logs: str
1✔
28

29

30
class LogHandler:
1✔
31
    log_queue: "Queue[Union[LogItem, ShutdownPill]]"
1✔
32
    role_arn: str
1✔
33
    _thread: Optional[FuncThread]
1✔
34
    _shutdown_event: threading.Event
1✔
35

36
    def __init__(self, role_arn: str, region: str) -> None:
1✔
37
        self.role_arn = role_arn
1✔
38
        self.region = region
1✔
39
        self.log_queue = Queue()
1✔
40
        self._shutdown_event = threading.Event()
1✔
41
        self._thread = None
1✔
42

43
    def run_log_loop(self, *args, **kwargs) -> None:
1✔
44
        logs_client = connect_to.with_assumed_role(
1✔
45
            region_name=self.region,
46
            role_arn=self.role_arn,
47
            service_principal=ServicePrincipal.lambda_,
48
        ).logs
49
        while not self._shutdown_event.is_set():
1✔
50
            log_item = self.log_queue.get()
1✔
51
            if log_item is QUEUE_SHUTDOWN:
1✔
52
                return
1✔
53
            try:
1✔
54
                store_cloudwatch_logs(
1✔
55
                    logs_client, log_item.log_group, log_item.log_stream, log_item.logs
56
                )
UNCOV
57
            except Exception as e:
×
UNCOV
58
                LOG.warning(
×
59
                    "Error saving logs to group %s in region %s: %s",
60
                    log_item.log_group,
61
                    self.region,
62
                    e,
63
                )
64

65
    def start_subscriber(self) -> None:
1✔
66
        if not is_api_enabled("logs"):
1✔
67
            LOG.debug("Service 'logs' is disabled, not storing any logs for lambda executions")
×
68
            return
×
69
        self._thread = FuncThread(self.run_log_loop, name="log_handler")
1✔
70
        self._thread.start()
1✔
71

72
    def add_logs(self, log_item: LogItem) -> None:
1✔
73
        if not is_api_enabled("logs"):
1✔
74
            return
×
75
        self.log_queue.put(log_item)
1✔
76

77
    def stop(self) -> None:
1✔
78
        self._shutdown_event.set()
1✔
79
        if self._thread:
1✔
80
            self.log_queue.put(QUEUE_SHUTDOWN)
1✔
81
            self._thread.join(timeout=2)
1✔
82
            if self._thread.is_alive():
1✔
83
                LOG.error("Could not stop log subscriber in time")
×
84
            self._thread = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc