• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

andgineer / opensearch-log / 20292503658

17 Dec 2025 05:17AM UTC coverage: 87.805% (-0.2%) from 88.011%
20292503658

push

github

andgineer
boto3 type stubs

3 of 4 new or added lines in 1 file covered. (75.0%)

324 of 369 relevant lines covered (87.8%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.21
/src/opensearch_log/cloudwatch_handler.py
1
"""Structured logger that sends logs to AWS CloudWatch."""
2

3
import contextlib
1✔
4
import logging
1✔
5
import time
1✔
6
from threading import Lock, Timer
1✔
7
from typing import TYPE_CHECKING, Any, Optional
1✔
8

9
if TYPE_CHECKING:
1✔
10
    from mypy_boto3_logs import CloudWatchLogsClient
×
NEW
11
    from mypy_boto3_logs.type_defs import InputLogEventTypeDef
×
12
else:
13
    CloudWatchLogsClient = Any
1✔
14
    InputLogEventTypeDef = dict[str, Any]  # type: ignore[misc]
1✔
15

16
try:
1✔
17
    import boto3
1✔
18
    import botocore.errorfactory
1✔
19
    import botocore.exceptions
1✔
20
except ImportError as e:
×
21
    raise ImportError(
×
22
        "To use CloudwatchHandler please install with this feature: "
23
        "`pip install opensearch-log[cloudwatch]`.",
24
    ) from e
25

26
from opensearch_log import json_log
1✔
27
from opensearch_log.base_handler import BaseHandler
1✔
28
from opensearch_log.stdout_handler import add_stdout_json_handler
1✔
29

30
BUFFER_SIZE = 100
1✔
31
FLUSH_SECONDS = 3.0
1✔
32
RETRY_NUM = 3
1✔
33

34

35
class CloudwatchHandler(BaseHandler):  # pylint: disable=too-many-instance-attributes
1✔
36
    """Handler that sends log records to AWS CloudWatch."""
37

38
    def __init__(self, *args: Any, log_group: str, log_stream: str, **kwargs: Any) -> None:
1✔
39
        """Initialize the handler."""
40
        super().__init__(*args, **kwargs)
1✔
41
        self._log_client = None
1✔
42
        self.log_group = log_group
1✔
43
        self.log_stream = log_stream
1✔
44
        self.buffer_size = BUFFER_SIZE
1✔
45
        self.flush_seconds = FLUSH_SECONDS
1✔
46
        self._buffer: list[InputLogEventTypeDef] = []
1✔
47
        self._buffer_lock: Lock = Lock()
1✔
48
        self._timer: Optional[Timer] = None
1✔
49

50
        self._initialize_log_group()
1✔
51
        self._initialize_log_stream()
1✔
52

53
    def _initialize_log_stream(self) -> None:
1✔
54
        """Create the log stream if it doesn't already exist."""
55
        with contextlib.suppress(self.log_client.exceptions.ResourceAlreadyExistsException):
1✔
56
            self.log_client.create_log_stream(
1✔
57
                logGroupName=self.log_group,
58
                logStreamName=self.log_stream,
59
            )
60

61
    def _initialize_log_group(self) -> None:
1✔
62
        """Create the log group if it doesn't already exist.
63

64
        Ignore access deny just in case the group really exists,
65
        but we do not have permission to create it and so the command will fail.
66
        If the group does not exist, and we fail to create it, in posting log messages we get
67
        more understandable error "The specified log group does not exist".
68
        """
69
        try:
1✔
70
            self.log_client.create_log_group(logGroupName=self.log_group)
1✔
71
        except self.log_client.exceptions.ResourceAlreadyExistsException:
1✔
72
            # Log group already exists. No action needed.
73
            pass
1✔
74
        except botocore.exceptions.ClientError as exc:
×
75
            # Check if the error is access denied error
76
            if exc.response["Error"]["Code"] != "AccessDeniedException":
×
77
                # Reraise the exception if it's not an AccessDeniedException
78
                raise
×
79

80
    @property
1✔
81
    def log_client(self) -> CloudWatchLogsClient:
1✔
82
        """Get the boto3 client for CloudWatch logs."""
83
        if self._log_client is None:
1✔
84
            # Disable boto's built-in logging to avoid recursion
85
            boto3.set_stream_logger("boto3", logging.CRITICAL)
1✔
86
            boto3.set_stream_logger("botocore", logging.CRITICAL)
1✔
87

88
            self._log_client = boto3.client("logs")
1✔
89
        return self._log_client
1✔
90

91
    def send_message(self, message: Optional[str], record: logging.LogRecord) -> None:  # noqa: ARG002
1✔
92
        """Buffer the log message and flush if necessary."""
93
        timestamp = int(round(time.time() * 1000))
1✔
94
        log_event: InputLogEventTypeDef = {"timestamp": timestamp, "message": message or ""}
1✔
95

96
        with self._buffer_lock:
1✔
97
            self._buffer.append(log_event)
1✔
98

99
        if len(self._buffer) >= self.buffer_size:
1✔
100
            self.flush()
×
101
        else:
102
            self._schedule_flush()
1✔
103

104
    def flush(self) -> None:
1✔
105
        """Flush the buffer to CloudWatch.
106

107
        Could run from Timer's thread or from the main thread.
108
        So we should not use json_formatter's routines that access ThreadLocal variables.
109
        """
110
        if hasattr(self, "_timer") and self._timer is not None and self._timer.is_alive():
1✔
111
            self._timer.cancel()
1✔
112
        self._timer = None
1✔
113

114
        if not self._buffer:
1✔
115
            return
1✔
116

117
        with self._buffer_lock:
1✔
118
            logs_buffer = self._buffer
1✔
119
            self._buffer = []
1✔
120

121
        retries = 0
1✔
122
        while retries < RETRY_NUM:
1✔
123
            try:
1✔
124
                self.log_client.put_log_events(
1✔
125
                    logGroupName=self.log_group,
126
                    logStreamName=self.log_stream,
127
                    logEvents=logs_buffer,
128
                )
129
                break
1✔
130
            except botocore.exceptions.ClientError as exc:
×
131
                retries += 1
×
132
                print(f"Retry {retries}: Could not send logs to CloudWatch: {exc}")
×
133
                if retries >= RETRY_NUM:
×
134
                    print("Exhausted retries. Lost logs:\n", logs_buffer)
×
135

136
    def _schedule_flush(self) -> None:
1✔
137
        """Schedule a flush operation."""
138
        if self._timer is None:
1✔
139
            self._timer = Timer(self.flush_seconds, self.flush)
1✔
140
            self._timer.daemon = True
1✔
141
            self._timer.start()
1✔
142

143
    def close(self) -> None:
1✔
144
        """Flush the buffer and release any outstanding resource."""
145
        self.flush()
1✔
146
        super().close()
1✔
147

148

149
def get_logger(
1✔
150
    *args: Any,
151
    echo_stdout: bool = False,
152
    log_group: str,
153
    log_stream: str,
154
    log_handler: Optional[BaseHandler] = None,
155
    **kwargs: Any,
156
) -> logging.Logger:
157
    """Create a logger that stream logs to CloudWatch."""
158
    assert log_handler is None, "log_handler should not be specified"
×
159
    logger = json_log.get_logger(
×
160
        *args,
161
        log_handler=CloudwatchHandler(log_group=log_group, log_stream=log_stream),
162
        **kwargs,
163
    )
164
    if echo_stdout:
×
165
        add_stdout_json_handler(logger)
×
166
    return logger
×
167

168

169
def remove_logger() -> None:
1✔
170
    """Flush and remove all handlers from the global logger."""
171
    json_log.remove_logger()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc