• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

andgineer / opensearch-log / 20190000179

13 Dec 2025 09:16AM UTC coverage: 88.011% (-0.2%) from 88.187%
20190000179

push

github

andgineer
pyrefly

5 of 6 new or added lines in 2 files covered. (83.33%)

323 of 367 relevant lines covered (88.01%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.85
/src/opensearch_log/cloudwatch_handler.py
1
"""Structured logger that sends logs to AWS CloudWatch."""
2

3
import contextlib
1✔
4
import logging
1✔
5
import time
1✔
6
from threading import Lock, Timer
1✔
7
from typing import TYPE_CHECKING, Any, Optional
1✔
8

9
if TYPE_CHECKING:
1✔
NEW
10
    from mypy_boto3_logs import CloudWatchLogsClient
×
11
else:
12
    CloudWatchLogsClient = Any
1✔
13

14
try:
1✔
15
    import boto3
1✔
16
    import botocore.errorfactory
1✔
17
    import botocore.exceptions
1✔
18
except ImportError as e:
×
19
    raise ImportError(
×
20
        "To use CloudwatchHandler please install with this feature: "
21
        "`pip install opensearch-log[cloudwatch]`.",
22
    ) from e
23

24
from opensearch_log import json_log
1✔
25
from opensearch_log.base_handler import BaseHandler
1✔
26
from opensearch_log.stdout_handler import add_stdout_json_handler
1✔
27

28
BUFFER_SIZE = 100
1✔
29
FLUSH_SECONDS = 3.0
1✔
30
RETRY_NUM = 3
1✔
31

32

33
class CloudwatchHandler(BaseHandler):  # pylint: disable=too-many-instance-attributes
1✔
34
    """Handler that sends log records to AWS CloudWatch."""
35

36
    def __init__(self, *args: Any, log_group: str, log_stream: str, **kwargs: Any) -> None:
1✔
37
        """Initialize the handler."""
38
        super().__init__(*args, **kwargs)
1✔
39
        self._log_client = None
1✔
40
        self.log_group = log_group
1✔
41
        self.log_stream = log_stream
1✔
42
        self.buffer_size = BUFFER_SIZE
1✔
43
        self.flush_seconds = FLUSH_SECONDS
1✔
44
        self._buffer: list[dict[str, Any]] = []
1✔
45
        self._buffer_lock: Lock = Lock()
1✔
46
        self._timer: Optional[Timer] = None
1✔
47

48
        self._initialize_log_group()
1✔
49
        self._initialize_log_stream()
1✔
50

51
    def _initialize_log_stream(self) -> None:
1✔
52
        """Create the log stream if it doesn't already exist."""
53
        with contextlib.suppress(self.log_client.exceptions.ResourceAlreadyExistsException):
1✔
54
            self.log_client.create_log_stream(
1✔
55
                logGroupName=self.log_group,
56
                logStreamName=self.log_stream,
57
            )
58

59
    def _initialize_log_group(self) -> None:
1✔
60
        """Create the log group if it doesn't already exist.
61

62
        Ignore access deny just in case the group really exists,
63
        but we do not have permission to create it and so the command will fail.
64
        If the group does not exist, and we fail to create it, in posting log messages we get
65
        more understandable error "The specified log group does not exist".
66
        """
67
        try:
1✔
68
            self.log_client.create_log_group(logGroupName=self.log_group)
1✔
69
        except self.log_client.exceptions.ResourceAlreadyExistsException:
1✔
70
            # Log group already exists. No action needed.
71
            pass
1✔
72
        except botocore.exceptions.ClientError as exc:
×
73
            # Check if the error is access denied error
74
            if exc.response["Error"]["Code"] != "AccessDeniedException":
×
75
                # Reraise the exception if it's not an AccessDeniedException
76
                raise
×
77

78
    @property
1✔
79
    def log_client(self) -> CloudWatchLogsClient:
1✔
80
        """Get the boto3 client for CloudWatch logs."""
81
        if self._log_client is None:
1✔
82
            # Disable boto's built-in logging to avoid recursion
83
            boto3.set_stream_logger("boto3", logging.CRITICAL)
1✔
84
            boto3.set_stream_logger("botocore", logging.CRITICAL)
1✔
85

86
            self._log_client = boto3.client("logs")
1✔
87
        return self._log_client
1✔
88

89
    def send_message(self, message: Optional[str], record: logging.LogRecord) -> None:  # noqa: ARG002
1✔
90
        """Buffer the log message and flush if necessary."""
91
        timestamp = int(round(time.time() * 1000))
1✔
92
        log_event = {"timestamp": timestamp, "message": message}
1✔
93

94
        with self._buffer_lock:
1✔
95
            self._buffer.append(log_event)
1✔
96

97
        if len(self._buffer) >= self.buffer_size:
1✔
98
            self.flush()
×
99
        else:
100
            self._schedule_flush()
1✔
101

102
    def flush(self) -> None:
1✔
103
        """Flush the buffer to CloudWatch.
104

105
        Could run from Timer's thread or from the main thread.
106
        So we should not use json_formatter's routines that access ThreadLocal variables.
107
        """
108
        if hasattr(self, "_timer") and self._timer is not None and self._timer.is_alive():
1✔
109
            self._timer.cancel()
1✔
110
        self._timer = None
1✔
111

112
        if not self._buffer:
1✔
113
            return
1✔
114

115
        with self._buffer_lock:
1✔
116
            logs_buffer = self._buffer
1✔
117
            self._buffer = []
1✔
118

119
        retries = 0
1✔
120
        while retries < RETRY_NUM:
1✔
121
            try:
1✔
122
                self.log_client.put_log_events(
1✔
123
                    logGroupName=self.log_group,
124
                    logStreamName=self.log_stream,
125
                    logEvents=logs_buffer,
126
                )
127
                break
1✔
128
            except botocore.exceptions.ClientError as exc:
×
129
                retries += 1
×
130
                print(f"Retry {retries}: Could not send logs to CloudWatch: {exc}")
×
131
                if retries >= RETRY_NUM:
×
132
                    print("Exhausted retries. Lost logs:\n", logs_buffer)
×
133

134
    def _schedule_flush(self) -> None:
1✔
135
        """Schedule a flush operation."""
136
        if self._timer is None:
1✔
137
            self._timer = Timer(self.flush_seconds, self.flush)
1✔
138
            self._timer.daemon = True
1✔
139
            self._timer.start()
1✔
140

141
    def close(self) -> None:
1✔
142
        """Flush the buffer and release any outstanding resource."""
143
        self.flush()
1✔
144
        super().close()
1✔
145

146

147
def get_logger(
1✔
148
    *args: Any,
149
    echo_stdout: bool = False,
150
    log_group: str,
151
    log_stream: str,
152
    log_handler: Optional[BaseHandler] = None,
153
    **kwargs: Any,
154
) -> logging.Logger:
155
    """Create a logger that stream logs to CloudWatch."""
156
    assert log_handler is None, "log_handler should not be specified"
×
157
    logger = json_log.get_logger(
×
158
        *args,
159
        log_handler=CloudwatchHandler(log_group=log_group, log_stream=log_stream),
160
        **kwargs,
161
    )
162
    if echo_stdout:
×
163
        add_stdout_json_handler(logger)
×
164
    return logger
×
165

166

167
def remove_logger() -> None:
1✔
168
    """Flush and remove all handlers from the global logger."""
169
    json_log.remove_logger()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc