• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21697093787

04 Feb 2026 09:56PM UTC coverage: 86.962% (-0.004%) from 86.966%
21697093787

push

github

web-flow
improve system information sent in session and container_info (#13680)

10 of 17 new or added lines in 2 files covered. (58.82%)

222 existing lines in 17 files now uncovered.

70560 of 81139 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/localstack-core/localstack/services/lambda_/invocation/logs.py
1
import dataclasses
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from queue import Queue
1✔
6

7
from localstack.aws.connect import connect_to
1✔
8
from localstack.utils.aws.client_types import ServicePrincipal
1✔
9
from localstack.utils.bootstrap import is_api_enabled
1✔
10
from localstack.utils.threads import FuncThread
1✔
11

12
LOG = logging.getLogger(__name__)
1✔
13

14

15
class ShutdownPill:
1✔
16
    pass
1✔
17

18

19
QUEUE_SHUTDOWN = ShutdownPill()
1✔
20

21

22
@dataclasses.dataclass(frozen=True)
1✔
23
class LogItem:
1✔
24
    log_group: str
1✔
25
    log_stream: str
1✔
26
    logs: str
1✔
27

28

29
class LogHandler:
1✔
30
    log_queue: "Queue[LogItem | ShutdownPill]"
1✔
31
    role_arn: str
1✔
32
    _thread: FuncThread | None
1✔
33
    _shutdown_event: threading.Event
1✔
34

35
    def __init__(self, role_arn: str, region: str) -> None:
1✔
36
        self.role_arn = role_arn
1✔
37
        self.region = region
1✔
38
        self.log_queue = Queue()
1✔
39
        self._shutdown_event = threading.Event()
1✔
40
        self._thread = None
1✔
41

42
    def run_log_loop(self, *args, **kwargs) -> None:
1✔
43
        logs_client = connect_to.with_assumed_role(
1✔
44
            region_name=self.region,
45
            role_arn=self.role_arn,
46
            service_principal=ServicePrincipal.lambda_,
47
        ).logs
48
        while not self._shutdown_event.is_set():
1✔
49
            log_item = self.log_queue.get()
1✔
50
            if log_item is QUEUE_SHUTDOWN:
1✔
51
                return
1✔
52
            # we need to split by newline - but keep the newlines in the strings
53
            # strips empty lines, as they are not accepted by cloudwatch
54
            logs = [line + "\n" for line in log_item.logs.split("\n") if line]
1✔
55
            # until we have a better way to have timestamps, log events have the same time for a single invocation
56
            log_events = [
1✔
57
                {"timestamp": int(time.time() * 1000), "message": log_line} for log_line in logs
58
            ]
59
            try:
1✔
60
                try:
1✔
61
                    logs_client.put_log_events(
1✔
62
                        logGroupName=log_item.log_group,
63
                        logStreamName=log_item.log_stream,
64
                        logEvents=log_events,
65
                    )
66
                except logs_client.exceptions.ResourceNotFoundException:
1✔
67
                    # create new log group
68
                    try:
1✔
69
                        logs_client.create_log_group(logGroupName=log_item.log_group)
1✔
70
                    except logs_client.exceptions.ResourceAlreadyExistsException:
1✔
71
                        pass
1✔
72
                    logs_client.create_log_stream(
1✔
73
                        logGroupName=log_item.log_group, logStreamName=log_item.log_stream
74
                    )
75
                    logs_client.put_log_events(
1✔
76
                        logGroupName=log_item.log_group,
77
                        logStreamName=log_item.log_stream,
78
                        logEvents=log_events,
79
                    )
UNCOV
80
            except Exception as e:
×
UNCOV
81
                LOG.warning(
×
82
                    "Error saving logs to group %s in region %s: %s",
83
                    log_item.log_group,
84
                    self.region,
85
                    e,
86
                )
87

88
    def start_subscriber(self) -> None:
1✔
89
        if not is_api_enabled("logs"):
1✔
90
            LOG.debug("Service 'logs' is disabled, not storing any logs for lambda executions")
×
91
            return
×
92
        self._thread = FuncThread(self.run_log_loop, name="log_handler")
1✔
93
        self._thread.start()
1✔
94

95
    def add_logs(self, log_item: LogItem) -> None:
1✔
96
        if not is_api_enabled("logs"):
1✔
97
            return
×
98
        self.log_queue.put(log_item)
1✔
99

100
    def stop(self) -> None:
1✔
101
        self._shutdown_event.set()
1✔
102
        if self._thread:
1✔
103
            self.log_queue.put(QUEUE_SHUTDOWN)
1✔
104
            self._thread.join(timeout=2)
1✔
105
            if self._thread.is_alive():
1✔
106
                LOG.error("Could not stop log subscriber in time")
×
107
            self._thread = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc