• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.74
/localstack-core/localstack/services/dynamodb/server.py
1
import logging
1✔
2
import os
1✔
3
import threading
1✔
4

5
from localstack import config
1✔
6
from localstack.aws.connect import connect_externally_to
1✔
7
from localstack.aws.forwarder import AwsRequestProxy
1✔
8
from localstack.config import is_env_true, is_persistence_enabled
1✔
9
from localstack.constants import DEFAULT_AWS_ACCOUNT_ID
1✔
10
from localstack.services.dynamodb.packages import dynamodblocal_package
1✔
11
from localstack.utils.common import TMP_THREADS, ShellCommandThread, get_free_tcp_port, mkdir
1✔
12
from localstack.utils.functions import run_safe
1✔
13
from localstack.utils.net import wait_for_port_closed
1✔
14
from localstack.utils.objects import singleton_factory
1✔
15
from localstack.utils.platform import Arch, get_arch
1✔
16
from localstack.utils.run import FuncThread, run
1✔
17
from localstack.utils.serving import Server
1✔
18
from localstack.utils.sync import retry, synchronized
1✔
19

20
LOG = logging.getLogger(__name__)
1✔
21
RESTART_LOCK = threading.RLock()
1✔
22

23

24
def _log_listener(line, **_kwargs):
1✔
25
    LOG.debug(line.rstrip())
1✔
26

27

28
class DynamodbServer(Server):
1✔
29
    db_path: str | None
1✔
30
    heap_size: str
1✔
31

32
    delay_transient_statuses: bool
1✔
33
    optimize_db_before_startup: bool
1✔
34
    share_db: bool
1✔
35
    cors: str | None
1✔
36

37
    proxy: AwsRequestProxy
1✔
38

39
    def __init__(
1✔
40
        self,
41
        port: int | None = None,
42
        host: str = "localhost",
43
        db_path: str | None = None,
44
    ) -> None:
45
        """
46
        Creates a DynamoDB server from the local configuration.
47

48
        :param port: optional, the port to start the server on (defaults to a random port)
49
        :param host: localhost by default
50
        :param db_path: path to the persistence state files used by the DynamoDB Local process
51
        """
52

53
        port = port or get_free_tcp_port()
1✔
54
        super().__init__(port, host)
1✔
55

56
        self.db_path = (
1✔
57
            f"{config.dirs.data}/dynamodb" if not db_path and config.dirs.data else db_path
58
        )
59

60
        # With persistence and MANUAL save strategy, we start DDBLocal from a temporary folder than gets copied over
61
        # the "usual" data directory in the `on_before_state_save` hook (see the provider code for more details).
62
        if is_persistence_enabled() and config.SNAPSHOT_SAVE_STRATEGY == "MANUAL":
1✔
63
            self.db_path = f"{config.dirs.tmp}/dynamodb"
×
64
            LOG.debug(
×
65
                "Persistence save strategy set to MANUAL. The DB path is temporarily stored at: %s",
66
                self.db_path,
67
            )
68

69
        # the DYNAMODB_IN_MEMORY variable takes precedence and will set the DB path to None which forces inMemory=true
70
        if is_env_true("DYNAMODB_IN_MEMORY"):
1✔
71
            # note: with DYNAMODB_IN_MEMORY we do not support persistence
72
            self.db_path = None
×
73

74
        if self.db_path:
1✔
75
            self.db_path = os.path.abspath(self.db_path)
1✔
76

77
        self.heap_size = config.DYNAMODB_HEAP_SIZE
1✔
78
        self.delay_transient_statuses = is_env_true("DYNAMODB_DELAY_TRANSIENT_STATUSES")
1✔
79
        self.optimize_db_before_startup = is_env_true("DYNAMODB_OPTIMIZE_DB_BEFORE_STARTUP")
1✔
80
        self.share_db = is_env_true("DYNAMODB_SHARE_DB")
1✔
81
        self.cors = os.getenv("DYNAMODB_CORS", None)
1✔
82
        self.proxy = AwsRequestProxy(self.url)
1✔
83

84
    @staticmethod
1✔
85
    @singleton_factory
1✔
86
    def get() -> "DynamodbServer":
1✔
87
        return DynamodbServer(config.DYNAMODB_LOCAL_PORT)
1✔
88

89
    @synchronized(lock=RESTART_LOCK)
1✔
90
    def start_dynamodb(self) -> bool:
1✔
91
        """Start the DynamoDB server."""
92

93
        # We want this method to be idempotent.
94
        if self.is_running() and self.is_up():
1✔
95
            return True
×
96

97
        # For the v2 provider, the DynamodbServer has been made a singleton. Yet, the Server abstraction is modelled
98
        # after threading.Thread, where Start -> Stop -> Start is not allowed. This flow happens during state resets.
99
        # The following is a workaround that permits this flow
100
        self._started.clear()
1✔
101
        self._stopped.clear()
1✔
102

103
        # Note: when starting the server, we had a flag for wiping the assets directory before the actual start.
104
        # This behavior was needed in some particular cases:
105
        # - pod load with some assets already lying in the asset folder
106
        # - ...
107
        # The cleaning is now done via the reset endpoint
108
        if self.db_path:
1✔
109
            mkdir(self.db_path)
1✔
110

111
        started = self.start()
1✔
112
        self.wait_for_dynamodb()
1✔
113
        return started
1✔
114

115
    @synchronized(lock=RESTART_LOCK)
1✔
116
    def stop_dynamodb(self) -> None:
1✔
117
        """Stop the DynamoDB server."""
118
        import psutil
×
119

120
        if self._thread is None:
×
121
            return
×
122
        self._thread.auto_restart = False
×
123
        self.shutdown()
×
124
        self.join(timeout=10)
×
125
        try:
×
126
            wait_for_port_closed(self.port, sleep_time=0.8, retries=10)
×
127
        except Exception:
×
128
            LOG.warning(
×
129
                "DynamoDB server port %s (%s) unexpectedly still open; running processes: %s",
130
                self.port,
131
                self._thread,
132
                run(["ps", "aux"]),
133
            )
134

135
            # attempt to terminate/kill the process manually
136
            server_pid = self._thread.process.pid  # noqa
×
137
            LOG.info("Attempting to kill DynamoDB process %s", server_pid)
×
138
            process = psutil.Process(server_pid)
×
139
            run_safe(process.terminate)
×
140
            run_safe(process.kill)
×
141
            wait_for_port_closed(self.port, sleep_time=0.5, retries=8)
×
142

143
    @property
1✔
144
    def in_memory(self) -> bool:
1✔
145
        return self.db_path is None
1✔
146

147
    @property
1✔
148
    def jar_path(self) -> str:
1✔
149
        return f"{dynamodblocal_package.get_installed_dir()}/DynamoDBLocal.jar"
1✔
150

151
    @property
1✔
152
    def library_path(self) -> str:
1✔
153
        return f"{dynamodblocal_package.get_installed_dir()}/DynamoDBLocal_lib"
1✔
154

155
    def _get_java_vm_options(self) -> list[str]:
1✔
156
        # Workaround for JVM SIGILL crash on Apple Silicon M4
157
        # See https://bugs.openjdk.org/browse/JDK-8345296
158
        # To be removed after Java is bumped to 17.0.15+ and 21.0.7+
159
        return ["-XX:UseSVE=0"] if Arch.arm64 == get_arch() else []
1✔
160

161
    def _create_shell_command(self) -> list[str]:
1✔
162
        cmd = [
1✔
163
            "java",
164
            *self._get_java_vm_options(),
165
            f"-Xmx{self.heap_size}",
166
            f"-javaagent:{dynamodblocal_package.get_installer().get_ddb_agent_jar_path()}",
167
            f"-Djava.library.path={self.library_path}",
168
            "-jar",
169
            self.jar_path,
170
        ]
171
        parameters = []
1✔
172

173
        parameters.extend(["-port", str(self.port)])
1✔
174
        if self.in_memory:
1✔
175
            parameters.append("-inMemory")
×
176
        if self.db_path:
1✔
177
            parameters.extend(["-dbPath", self.db_path])
1✔
178
        if self.delay_transient_statuses:
1✔
179
            parameters.extend(["-delayTransientStatuses"])
×
180
        if self.optimize_db_before_startup:
1✔
181
            parameters.extend(["-optimizeDbBeforeStartup"])
×
182
        if self.share_db:
1✔
183
            parameters.extend(["-sharedDb"])
×
184

185
        return cmd + parameters
1✔
186

187
    def do_start_thread(self) -> FuncThread:
1✔
188
        dynamodblocal_installer = dynamodblocal_package.get_installer()
1✔
189
        dynamodblocal_installer.install()
1✔
190

191
        cmd = self._create_shell_command()
1✔
192
        env_vars = {
1✔
193
            **dynamodblocal_installer.get_java_env_vars(),
194
            "DDB_LOCAL_TELEMETRY": "0",
195
        }
196

197
        LOG.debug("Starting DynamoDB Local: %s", cmd)
1✔
198
        t = ShellCommandThread(
1✔
199
            cmd,
200
            strip_color=True,
201
            log_listener=_log_listener,
202
            auto_restart=True,
203
            name="dynamodb-local",
204
            env_vars=env_vars,
205
        )
206
        TMP_THREADS.append(t)
1✔
207
        t.start()
1✔
208
        return t
1✔
209

210
    def check_dynamodb(self, expect_shutdown: bool = False) -> None:
1✔
211
        """Checks if DynamoDB server is up"""
212
        out = None
1✔
213

214
        try:
1✔
215
            self.wait_is_up()
1✔
216
            out = connect_externally_to(
1✔
217
                endpoint_url=self.url,
218
                aws_access_key_id=DEFAULT_AWS_ACCOUNT_ID,
219
                aws_secret_access_key=DEFAULT_AWS_ACCOUNT_ID,
220
            ).dynamodb.list_tables()
221
        except Exception:
×
222
            LOG.error("DynamoDB health check failed", exc_info=LOG.isEnabledFor(logging.DEBUG))
×
223
        if expect_shutdown:
1✔
224
            assert out is None
×
225
        else:
226
            assert isinstance(out["TableNames"], list)
1✔
227

228
    def wait_for_dynamodb(self) -> None:
1✔
229
        retry(self.check_dynamodb, sleep=0.4, retries=10)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc