• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22048723723

13 Feb 2026 06:59PM UTC coverage: 87.006% (+0.1%) from 86.883%
22048723723

push

github

web-flow
CW Logs: Test suite for service internalization (#13692)

22 of 22 new or added lines in 1 file covered. (100.0%)

928 existing lines in 33 files now uncovered.

69716 of 80128 relevant lines covered (87.01%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.28
/localstack-core/localstack/services/events/models.py
1
import uuid
1✔
2
from dataclasses import dataclass, field
1✔
3
from datetime import UTC, datetime
1✔
4
from enum import Enum
1✔
5
from typing import Any, TypedDict
1✔
6

7
from localstack.aws.api import CommonServiceException
1✔
8
from localstack.aws.api.events import (
1✔
9
    ApiDestinationDescription,
10
    ApiDestinationHttpMethod,
11
    ApiDestinationInvocationRateLimitPerSecond,
12
    ApiDestinationName,
13
    ApiDestinationState,
14
    ArchiveDescription,
15
    ArchiveName,
16
    ArchiveState,
17
    Arn,
18
    ConnectionArn,
19
    ConnectionAuthorizationType,
20
    ConnectionDescription,
21
    ConnectionName,
22
    ConnectionState,
23
    ConnectivityResourceParameters,
24
    CreateConnectionAuthRequestParameters,
25
    CreatedBy,
26
    EventBusName,
27
    EventPattern,
28
    EventResourceList,
29
    EventSourceName,
30
    HttpsEndpoint,
31
    ManagedBy,
32
    ReplayDescription,
33
    ReplayDestination,
34
    ReplayName,
35
    ReplayState,
36
    ReplayStateReason,
37
    RetentionDays,
38
    RoleArn,
39
    RuleDescription,
40
    RuleName,
41
    RuleState,
42
    ScheduleExpression,
43
    TagList,
44
    Target,
45
    TargetId,
46
    Timestamp,
47
)
48
from localstack.services.stores import (
1✔
49
    AccountRegionBundle,
50
    BaseStore,
51
    CrossRegionAttribute,
52
    LocalAttribute,
53
)
54
from localstack.utils.aws.arns import (
1✔
55
    event_bus_arn,
56
    events_api_destination_arn,
57
    events_archive_arn,
58
    events_connection_arn,
59
    events_replay_arn,
60
    events_rule_arn,
61
)
62
from localstack.utils.strings import short_uid
1✔
63
from localstack.utils.tagging import TaggingService
1✔
64

65
TargetDict = dict[TargetId, Target]
1✔
66

67

68
class ValidationException(CommonServiceException):
1✔
69
    def __init__(self, message: str):
1✔
70
        super().__init__("ValidationException", message, 400, True)
1✔
71

72

73
class InvalidEventPatternException(Exception):
1✔
74
    reason: str
1✔
75

76
    def __init__(self, reason=None, message=None) -> None:
1✔
UNCOV
77
        self.reason = reason
×
78
        self.message = message or f"Event pattern is not valid. Reason: {reason}"
×
79

80

81
FormattedEvent = TypedDict(  # functional syntax required due to name-name keys
1✔
82
    "FormattedEvent",
83
    {
84
        "version": str,
85
        "id": str,
86
        "detail-type": str | None,
87
        "source": EventSourceName | None,
88
        "account": str,
89
        "time": str,
90
        "region": str,
91
        "resources": EventResourceList | None,
92
        "detail": dict[str, Any],
93
        "replay-name": ReplayName | None,
94
        "event-bus-name": EventBusName,
95
    },
96
)
97

98

99
FormattedEventDict = dict[str, FormattedEvent]
1✔
100
FormattedEventList = list[FormattedEvent]
1✔
101

102
type TransformedEvent = FormattedEvent | dict | str
1✔
103

104

105
class ResourceType(Enum):
1✔
106
    EVENT_BUS = "event_bus"
1✔
107
    RULE = "rule"
1✔
108

109

110
class Statement(TypedDict):
1✔
111
    Sid: str | None
1✔
112
    Effect: str
1✔
113
    Principal: str | dict[str, str] | None
1✔
114
    Action: str | list[str]
1✔
115
    Resource: str | list[str]
1✔
116
    Condition: dict[str, Any] | None
1✔
117

118

119
class ResourcePolicy(TypedDict):
1✔
120
    Version: str
1✔
121
    Statement: list[Statement]
1✔
122

123

124
@dataclass
1✔
125
class Rule:
1✔
126
    name: RuleName
1✔
127
    region: str
1✔
128
    account_id: str
1✔
129
    schedule_expression: ScheduleExpression | None = None
1✔
130
    event_pattern: EventPattern | None = None
1✔
131
    state: RuleState | None = None
1✔
132
    description: RuleDescription | None = None
1✔
133
    role_arn: RoleArn | None = None
1✔
134
    tags: TagList = field(default_factory=list)
1✔
135
    event_bus_name: EventBusName = "default"
1✔
136
    targets: TargetDict = field(default_factory=dict)
1✔
137
    managed_by: ManagedBy | None = None  # can only be set by AWS services
1✔
138
    created_by: CreatedBy = field(init=False)
1✔
139

140
    def __post_init__(self):
1✔
141
        self.created_by = self.account_id
1✔
142
        if self.tags is None:
1✔
143
            self.tags = []
1✔
144
        if self.targets is None:
1✔
145
            self.targets = {}
1✔
146
        if self.state is None:
1✔
147
            self.state = RuleState.ENABLED
1✔
148

149
    @property
1✔
150
    def arn(self) -> Arn:
1✔
151
        return events_rule_arn(self.name, self.account_id, self.region, self.event_bus_name)
1✔
152

153

154
RuleDict = dict[RuleName, Rule]
1✔
155

156

157
@dataclass
1✔
158
class Replay:
1✔
159
    name: str
1✔
160
    region: str
1✔
161
    account_id: str
1✔
162
    event_source_arn: Arn
1✔
163
    destination: ReplayDestination  # Event Bus Arn or Rule Arns
1✔
164
    event_start_time: Timestamp
1✔
165
    event_end_time: Timestamp
1✔
166
    description: ReplayDescription | None = None
1✔
167
    state: ReplayState | None = None
1✔
168
    state_reason: ReplayStateReason | None = None
1✔
169
    event_last_replayed_time: Timestamp | None = None
1✔
170
    replay_start_time: Timestamp | None = None
1✔
171
    replay_end_time: Timestamp | None = None
1✔
172

173
    @property
1✔
174
    def arn(self) -> Arn:
1✔
175
        return events_replay_arn(self.name, self.account_id, self.region)
1✔
176

177

178
ReplayDict = dict[ReplayName, Replay]
1✔
179

180

181
@dataclass
1✔
182
class Archive:
1✔
183
    name: ArchiveName
1✔
184
    region: str
1✔
185
    account_id: str
1✔
186
    event_source_arn: Arn
1✔
187
    description: ArchiveDescription | None = None
1✔
188
    event_pattern: EventPattern | None = None
1✔
189
    retention_days: RetentionDays | None = None
1✔
190
    state: ArchiveState = field(init=False, default=ArchiveState.DISABLED)
1✔
191
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
192
    events: FormattedEventDict = field(init=False, default_factory=dict)
1✔
193
    # TODO how to deal with updating this value?
194
    size_bytes: int = field(init=False, default=0)
1✔
195

196
    @property
1✔
197
    def arn(self) -> Arn:
1✔
198
        return events_archive_arn(self.name, self.account_id, self.region)
1✔
199

200
    @property
1✔
201
    def event_count(self) -> int:
1✔
202
        return len(self.events)
1✔
203

204

205
ArchiveDict = dict[ArchiveName, Archive]
1✔
206

207

208
@dataclass
1✔
209
class EventBus:
1✔
210
    name: EventBusName
1✔
211
    region: str
1✔
212
    account_id: str
1✔
213
    event_source_name: str | None = None
1✔
214
    description: str | None = None
1✔
215
    tags: TagList = field(default_factory=list)
1✔
216
    policy: ResourcePolicy | None = None
1✔
217
    rules: RuleDict = field(default_factory=dict)
1✔
218
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
219
    last_modified_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
220

221
    def __post_init__(self):
1✔
222
        if self.rules is None:
1✔
223
            self.rules = {}
1✔
224
        if self.tags is None:
1✔
225
            self.tags = []
1✔
226

227
    @property
1✔
228
    def arn(self) -> Arn:
1✔
229
        return event_bus_arn(self.name, self.account_id, self.region)
1✔
230

231

232
EventBusDict = dict[EventBusName, EventBus]
1✔
233

234

235
@dataclass
1✔
236
class Connection:
1✔
237
    name: ConnectionName
1✔
238
    region: str
1✔
239
    account_id: str
1✔
240
    authorization_type: ConnectionAuthorizationType
1✔
241
    auth_parameters: CreateConnectionAuthRequestParameters
1✔
242
    state: ConnectionState
1✔
243
    secret_arn: Arn
1✔
244
    description: ConnectionDescription | None = None
1✔
245
    invocation_connectivity_parameters: ConnectivityResourceParameters | None = None
1✔
246
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
247
    last_modified_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
248
    last_authorized_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
249
    tags: TagList = field(default_factory=list)
1✔
250
    id: str = str(uuid.uuid4())
1✔
251

252
    def __post_init__(self):
1✔
253
        if self.tags is None:
1✔
UNCOV
254
            self.tags = []
×
255

256
    @property
1✔
257
    def arn(self) -> Arn:
1✔
258
        return events_connection_arn(self.name, self.id, self.account_id, self.region)
1✔
259

260

261
ConnectionDict = dict[ConnectionName, Connection]
1✔
262

263

264
@dataclass
1✔
265
class ApiDestination:
1✔
266
    name: ApiDestinationName
1✔
267
    region: str
1✔
268
    account_id: str
1✔
269
    connection_arn: ConnectionArn
1✔
270
    invocation_endpoint: HttpsEndpoint
1✔
271
    http_method: ApiDestinationHttpMethod
1✔
272
    state: ApiDestinationState
1✔
273
    _invocation_rate_limit_per_second: ApiDestinationInvocationRateLimitPerSecond | None = None
1✔
274
    description: ApiDestinationDescription | None = None
1✔
275
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
276
    last_modified_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
277
    last_authorized_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
278
    tags: TagList = field(default_factory=list)
1✔
279
    id: str = str(short_uid())
1✔
280

281
    def __post_init__(self):
1✔
282
        if self.tags is None:
1✔
UNCOV
283
            self.tags = []
×
284

285
    @property
1✔
286
    def arn(self) -> Arn:
1✔
287
        return events_api_destination_arn(self.name, self.id, self.account_id, self.region)
1✔
288

289
    @property
1✔
290
    def invocation_rate_limit_per_second(self) -> int:
1✔
291
        return self._invocation_rate_limit_per_second or 300  # Default value
1✔
292

293
    @invocation_rate_limit_per_second.setter
1✔
294
    def invocation_rate_limit_per_second(
1✔
295
        self, value: ApiDestinationInvocationRateLimitPerSecond | None
296
    ):
UNCOV
297
        self._invocation_rate_limit_per_second = value
×
298

299

300
ApiDestinationDict = dict[ApiDestinationName, ApiDestination]
1✔
301

302

303
class EventsStore(BaseStore):
1✔
304
    # Map of eventbus names to eventbus objects. The name MUST be unique per account and region (works with AccountRegionBundle)
305
    event_buses: EventBusDict = LocalAttribute(default=dict)
1✔
306

307
    # Map of archive names to archive objects. The name MUST be unique per account and region (works with AccountRegionBundle)
308
    archives: ArchiveDict = LocalAttribute(default=dict)
1✔
309

310
    # Map of replay names to replay objects. The name MUST be unique per account and region (works with AccountRegionBundle)
311
    replays: ReplayDict = LocalAttribute(default=dict)
1✔
312

313
    # Map of connection names to connection objects.
314
    connections: ConnectionDict = LocalAttribute(default=dict)
1✔
315

316
    # Map of api destination names to api destination objects
317
    api_destinations: ApiDestinationDict = LocalAttribute(default=dict)
1✔
318

319
    # Maps resource ARN to tags
320
    TAGS: TaggingService = CrossRegionAttribute(default=TaggingService)
1✔
321

322

323
events_stores = AccountRegionBundle("events", EventsStore)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc