• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22519085314

27 Feb 2026 11:47PM UTC coverage: 86.962% (+0.006%) from 86.956%
22519085314

push

github

web-flow
SNS: update store typing (#13866)

3 of 3 new or added lines in 1 file covered. (100.0%)

388 existing lines in 19 files now uncovered.

69828 of 80297 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.18
/localstack-core/localstack/services/events/models.py
1
import uuid
1✔
2
from dataclasses import dataclass, field
1✔
3
from datetime import UTC, datetime
1✔
4
from enum import Enum
1✔
5
from typing import Any, TypedDict
1✔
6

7
from localstack.aws.api import CommonServiceException
1✔
8
from localstack.aws.api.events import (
1✔
9
    ApiDestinationDescription,
10
    ApiDestinationHttpMethod,
11
    ApiDestinationInvocationRateLimitPerSecond,
12
    ApiDestinationName,
13
    ApiDestinationState,
14
    ArchiveDescription,
15
    ArchiveName,
16
    ArchiveState,
17
    Arn,
18
    ConnectionArn,
19
    ConnectionAuthorizationType,
20
    ConnectionDescription,
21
    ConnectionName,
22
    ConnectionState,
23
    ConnectivityResourceParameters,
24
    CreateConnectionAuthRequestParameters,
25
    CreatedBy,
26
    EventBusName,
27
    EventPattern,
28
    EventResourceList,
29
    EventSourceName,
30
    HttpsEndpoint,
31
    ManagedBy,
32
    ReplayDescription,
33
    ReplayDestination,
34
    ReplayName,
35
    ReplayState,
36
    ReplayStateReason,
37
    RetentionDays,
38
    RoleArn,
39
    RuleDescription,
40
    RuleName,
41
    RuleState,
42
    ScheduleExpression,
43
    TagList,
44
    Target,
45
    TargetId,
46
    Timestamp,
47
)
48
from localstack.services.stores import (
1✔
49
    AccountRegionBundle,
50
    BaseStore,
51
    CrossRegionAttribute,
52
    LocalAttribute,
53
)
54
from localstack.utils.aws.arns import (
1✔
55
    event_bus_arn,
56
    events_api_destination_arn,
57
    events_archive_arn,
58
    events_connection_arn,
59
    events_replay_arn,
60
    events_rule_arn,
61
)
62
from localstack.utils.strings import short_uid
1✔
63
from localstack.utils.tagging import TaggingService
1✔
64

65
TargetDict = dict[TargetId, Target]
1✔
66

67

68
class ValidationException(CommonServiceException):
1✔
69
    def __init__(self, message: str):
1✔
70
        super().__init__("ValidationException", message, 400, True)
1✔
71

72

73
class InvalidEventPatternException(Exception):
1✔
74
    reason: str
1✔
75

76
    def __init__(self, reason=None, message=None) -> None:
1✔
77
        self.reason = reason
×
78
        self.message = message or f"Event pattern is not valid. Reason: {reason}"
×
79

80

81
FormattedEvent = TypedDict(  # functional syntax required due to name-name keys
1✔
82
    "FormattedEvent",
83
    {
84
        "version": str,
85
        "id": str,
86
        "detail-type": str | None,
87
        "source": EventSourceName | None,
88
        "account": str,
89
        "time": str,
90
        "region": str,
91
        "resources": EventResourceList | None,
92
        "detail": dict[str, Any],
93
        "replay-name": ReplayName | None,
94
        "event-bus-name": EventBusName,
95
    },
96
)
97

98

99
FormattedEventDict = dict[str, FormattedEvent]
1✔
100
FormattedEventList = list[FormattedEvent]
1✔
101

102
type TransformedEvent = FormattedEvent | dict | str
1✔
103

104

105
class ResourceType(Enum):
1✔
106
    EVENT_BUS = "event_bus"
1✔
107
    RULE = "rule"
1✔
108

109

110
class ResourcePolicy(TypedDict):
1✔
111
    Version: str
1✔
112
    Statement: list[dict[str, Any]]
1✔
113

114

115
@dataclass
1✔
116
class Rule:
1✔
117
    name: RuleName
1✔
118
    region: str
1✔
119
    account_id: str
1✔
120
    schedule_expression: ScheduleExpression | None = None
1✔
121
    event_pattern: EventPattern | None = None
1✔
122
    state: RuleState | None = None
1✔
123
    description: RuleDescription | None = None
1✔
124
    role_arn: RoleArn | None = None
1✔
125
    tags: TagList = field(default_factory=list)
1✔
126
    event_bus_name: EventBusName = "default"
1✔
127
    targets: TargetDict = field(default_factory=dict)
1✔
128
    managed_by: ManagedBy | None = None  # can only be set by AWS services
1✔
129
    created_by: CreatedBy = field(init=False)
1✔
130

131
    def __post_init__(self):
1✔
132
        self.created_by = self.account_id
1✔
133
        if self.tags is None:
1✔
134
            self.tags = []
1✔
135
        if self.targets is None:
1✔
136
            self.targets = {}
1✔
137
        if self.state is None:
1✔
138
            self.state = RuleState.ENABLED
1✔
139

140
    @property
1✔
141
    def arn(self) -> Arn:
1✔
142
        return events_rule_arn(self.name, self.account_id, self.region, self.event_bus_name)
1✔
143

144

145
RuleDict = dict[RuleName, Rule]
1✔
146

147

148
@dataclass
1✔
149
class Replay:
1✔
150
    name: str
1✔
151
    region: str
1✔
152
    account_id: str
1✔
153
    event_source_arn: Arn
1✔
154
    destination: ReplayDestination  # Event Bus Arn or Rule Arns
1✔
155
    event_start_time: Timestamp
1✔
156
    event_end_time: Timestamp
1✔
157
    description: ReplayDescription | None = None
1✔
158
    state: ReplayState | None = None
1✔
159
    state_reason: ReplayStateReason | None = None
1✔
160
    event_last_replayed_time: Timestamp | None = None
1✔
161
    replay_start_time: Timestamp | None = None
1✔
162
    replay_end_time: Timestamp | None = None
1✔
163

164
    @property
1✔
165
    def arn(self) -> Arn:
1✔
166
        return events_replay_arn(self.name, self.account_id, self.region)
1✔
167

168

169
ReplayDict = dict[ReplayName, Replay]
1✔
170

171

172
@dataclass
1✔
173
class Archive:
1✔
174
    name: ArchiveName
1✔
175
    region: str
1✔
176
    account_id: str
1✔
177
    event_source_arn: Arn
1✔
178
    description: ArchiveDescription | None = None
1✔
179
    event_pattern: EventPattern | None = None
1✔
180
    retention_days: RetentionDays | None = None
1✔
181
    state: ArchiveState = field(init=False, default=ArchiveState.DISABLED)
1✔
182
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
183
    events: FormattedEventDict = field(init=False, default_factory=dict)
1✔
184
    # TODO how to deal with updating this value?
185
    size_bytes: int = field(init=False, default=0)
1✔
186

187
    @property
1✔
188
    def arn(self) -> Arn:
1✔
189
        return events_archive_arn(self.name, self.account_id, self.region)
1✔
190

191
    @property
1✔
192
    def event_count(self) -> int:
1✔
193
        return len(self.events)
1✔
194

195

196
ArchiveDict = dict[ArchiveName, Archive]
1✔
197

198

199
@dataclass
1✔
200
class EventBus:
1✔
201
    name: EventBusName
1✔
202
    region: str
1✔
203
    account_id: str
1✔
204
    event_source_name: str | None = None
1✔
205
    description: str | None = None
1✔
206
    tags: TagList = field(default_factory=list)
1✔
207
    policy: ResourcePolicy | None = None
1✔
208
    rules: RuleDict = field(default_factory=dict)
1✔
209
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
210
    last_modified_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
211

212
    def __post_init__(self):
1✔
213
        if self.rules is None:
1✔
214
            self.rules = {}
1✔
215
        if self.tags is None:
1✔
216
            self.tags = []
1✔
217

218
    @property
1✔
219
    def arn(self) -> Arn:
1✔
220
        return event_bus_arn(self.name, self.account_id, self.region)
1✔
221

222

223
EventBusDict = dict[EventBusName, EventBus]
1✔
224

225

226
@dataclass
1✔
227
class Connection:
1✔
228
    name: ConnectionName
1✔
229
    region: str
1✔
230
    account_id: str
1✔
231
    authorization_type: ConnectionAuthorizationType
1✔
232
    auth_parameters: CreateConnectionAuthRequestParameters
1✔
233
    state: ConnectionState
1✔
234
    secret_arn: Arn
1✔
235
    description: ConnectionDescription | None = None
1✔
236
    invocation_connectivity_parameters: ConnectivityResourceParameters | None = None
1✔
237
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
238
    last_modified_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
239
    last_authorized_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
240
    tags: TagList = field(default_factory=list)
1✔
241
    id: str = str(uuid.uuid4())
1✔
242

243
    def __post_init__(self):
1✔
244
        if self.tags is None:
1✔
UNCOV
245
            self.tags = []
×
246

247
    @property
1✔
248
    def arn(self) -> Arn:
1✔
249
        return events_connection_arn(self.name, self.id, self.account_id, self.region)
1✔
250

251

252
ConnectionDict = dict[ConnectionName, Connection]
1✔
253

254

255
@dataclass
1✔
256
class ApiDestination:
1✔
257
    name: ApiDestinationName
1✔
258
    region: str
1✔
259
    account_id: str
1✔
260
    connection_arn: ConnectionArn
1✔
261
    invocation_endpoint: HttpsEndpoint
1✔
262
    http_method: ApiDestinationHttpMethod
1✔
263
    state: ApiDestinationState
1✔
264
    _invocation_rate_limit_per_second: ApiDestinationInvocationRateLimitPerSecond | None = None
1✔
265
    description: ApiDestinationDescription | None = None
1✔
266
    creation_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
267
    last_modified_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
268
    last_authorized_time: Timestamp = field(init=False, default_factory=lambda: datetime.now(UTC))
1✔
269
    tags: TagList = field(default_factory=list)
1✔
270
    id: str = str(short_uid())
1✔
271

272
    def __post_init__(self):
1✔
273
        if self.tags is None:
1✔
UNCOV
274
            self.tags = []
×
275

276
    @property
1✔
277
    def arn(self) -> Arn:
1✔
278
        return events_api_destination_arn(self.name, self.id, self.account_id, self.region)
1✔
279

280
    @property
1✔
281
    def invocation_rate_limit_per_second(self) -> int:
1✔
282
        return self._invocation_rate_limit_per_second or 300  # Default value
1✔
283

284
    @invocation_rate_limit_per_second.setter
1✔
285
    def invocation_rate_limit_per_second(
1✔
286
        self, value: ApiDestinationInvocationRateLimitPerSecond | None
287
    ):
UNCOV
288
        self._invocation_rate_limit_per_second = value
×
289

290

291
ApiDestinationDict = dict[ApiDestinationName, ApiDestination]
1✔
292

293

294
class EventsStore(BaseStore):
1✔
295
    # Map of eventbus names to eventbus objects. The name MUST be unique per account and region (works with AccountRegionBundle)
296
    event_buses: EventBusDict = LocalAttribute(default=dict)
1✔
297

298
    # Map of archive names to archive objects. The name MUST be unique per account and region (works with AccountRegionBundle)
299
    archives: ArchiveDict = LocalAttribute(default=dict)
1✔
300

301
    # Map of replay names to replay objects. The name MUST be unique per account and region (works with AccountRegionBundle)
302
    replays: ReplayDict = LocalAttribute(default=dict)
1✔
303

304
    # Map of connection names to connection objects.
305
    connections: ConnectionDict = LocalAttribute(default=dict)
1✔
306

307
    # Map of api destination names to api destination objects
308
    api_destinations: ApiDestinationDict = LocalAttribute(default=dict)
1✔
309

310
    # Maps resource ARN to tags
311
    TAGS: TaggingService = CrossRegionAttribute(default=TaggingService)
1✔
312

313

314
events_stores = AccountRegionBundle("events", EventsStore)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc