• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / fb695d20-3f14-49c6-99d2-b1b76030971b

03 Apr 2025 02:31PM UTC coverage: 86.835% (-0.02%) from 86.857%
fb695d20-3f14-49c6-99d2-b1b76030971b

push

circleci

web-flow
add VerifiedPermissions to the client types (#12474)

1 of 2 new or added lines in 1 file covered. (50.0%)

21 existing lines in 9 files now uncovered.

63550 of 73185 relevant lines covered (86.83%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/localstack-core/localstack/services/lambda_/invocation/logs.py
1
import dataclasses
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from queue import Queue
1✔
6
from typing import Optional, Union
1✔
7

8
from localstack.aws.connect import connect_to
1✔
9
from localstack.utils.aws.client_types import ServicePrincipal
1✔
10
from localstack.utils.bootstrap import is_api_enabled
1✔
11
from localstack.utils.threads import FuncThread
1✔
12

13
LOG = logging.getLogger(__name__)
1✔
14

15

16
class ShutdownPill:
1✔
17
    pass
1✔
18

19

20
QUEUE_SHUTDOWN = ShutdownPill()
1✔
21

22

23
@dataclasses.dataclass(frozen=True)
1✔
24
class LogItem:
1✔
25
    log_group: str
1✔
26
    log_stream: str
1✔
27
    logs: str
1✔
28

29

30
class LogHandler:
1✔
31
    log_queue: "Queue[Union[LogItem, ShutdownPill]]"
1✔
32
    role_arn: str
1✔
33
    _thread: Optional[FuncThread]
1✔
34
    _shutdown_event: threading.Event
1✔
35

36
    def __init__(self, role_arn: str, region: str) -> None:
1✔
37
        self.role_arn = role_arn
1✔
38
        self.region = region
1✔
39
        self.log_queue = Queue()
1✔
40
        self._shutdown_event = threading.Event()
1✔
41
        self._thread = None
1✔
42

43
    def run_log_loop(self, *args, **kwargs) -> None:
1✔
44
        logs_client = connect_to.with_assumed_role(
1✔
45
            region_name=self.region,
46
            role_arn=self.role_arn,
47
            service_principal=ServicePrincipal.lambda_,
48
        ).logs
49
        while not self._shutdown_event.is_set():
1✔
50
            log_item = self.log_queue.get()
1✔
51
            if log_item is QUEUE_SHUTDOWN:
1✔
52
                return
1✔
53
            # we need to split by newline - but keep the newlines in the strings
54
            # strips empty lines, as they are not accepted by cloudwatch
55
            logs = [line + "\n" for line in log_item.logs.split("\n") if line]
1✔
56
            # until we have a better way to have timestamps, log events have the same time for a single invocation
57
            log_events = [
1✔
58
                {"timestamp": int(time.time() * 1000), "message": log_line} for log_line in logs
59
            ]
60
            try:
1✔
61
                try:
1✔
62
                    logs_client.put_log_events(
1✔
63
                        logGroupName=log_item.log_group,
64
                        logStreamName=log_item.log_stream,
65
                        logEvents=log_events,
66
                    )
67
                except logs_client.exceptions.ResourceNotFoundException:
1✔
68
                    # create new log group
69
                    try:
1✔
70
                        logs_client.create_log_group(logGroupName=log_item.log_group)
1✔
71
                    except logs_client.exceptions.ResourceAlreadyExistsException:
1✔
72
                        pass
1✔
73
                    logs_client.create_log_stream(
1✔
74
                        logGroupName=log_item.log_group, logStreamName=log_item.log_stream
75
                    )
76
                    logs_client.put_log_events(
1✔
77
                        logGroupName=log_item.log_group,
78
                        logStreamName=log_item.log_stream,
79
                        logEvents=log_events,
80
                    )
81
            except Exception as e:
1✔
82
                LOG.warning(
1✔
83
                    "Error saving logs to group %s in region %s: %s",
84
                    log_item.log_group,
85
                    self.region,
86
                    e,
87
                )
88

89
    def start_subscriber(self) -> None:
1✔
90
        if not is_api_enabled("logs"):
1✔
UNCOV
91
            LOG.debug("Service 'logs' is disabled, not storing any logs for lambda executions")
×
UNCOV
92
            return
×
93
        self._thread = FuncThread(self.run_log_loop, name="log_handler")
1✔
94
        self._thread.start()
1✔
95

96
    def add_logs(self, log_item: LogItem) -> None:
1✔
97
        if not is_api_enabled("logs"):
1✔
UNCOV
98
            return
×
99
        self.log_queue.put(log_item)
1✔
100

101
    def stop(self) -> None:
1✔
102
        self._shutdown_event.set()
1✔
103
        if self._thread:
1✔
104
            self.log_queue.put(QUEUE_SHUTDOWN)
1✔
105
            self._thread.join(timeout=2)
1✔
106
            if self._thread.is_alive():
1✔
UNCOV
107
                LOG.error("Could not stop log subscriber in time")
×
108
            self._thread = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc