• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19416611244

14 Nov 2025 11:19AM UTC coverage: 86.902% (+0.009%) from 86.893%
19416611244

push

github

web-flow
Update README.md for 4.10 (#13378)

68616 of 78958 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.41
/localstack-core/localstack/utils/crypto.py
1
import io
1✔
2
import logging
1✔
3
import os
1✔
4
import re
1✔
5
import threading
1✔
6

7
from asn1crypto import algos, cms, core
1✔
8
from asn1crypto import x509 as asn1_x509
1✔
9
from cryptography.hazmat.backends import default_backend
1✔
10
from cryptography.hazmat.primitives import hashes
1✔
11
from cryptography.hazmat.primitives import padding as sym_padding
1✔
12
from cryptography.hazmat.primitives.asymmetric import padding
1✔
13
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
1✔
14
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
1✔
15

16
from .files import TMP_FILES, file_exists_not_empty, load_file, new_tmp_file, save_file
1✔
17
from .strings import short_uid, to_bytes, to_str
1✔
18
from .sync import synchronized
1✔
19
from .urls import localstack_host
1✔
20

21
LOG = logging.getLogger(__name__)
1✔
22

23
# block size for symmetric encrypt/decrypt operations
24
BLOCK_SIZE = 16
1✔
25

26
# lock for creating certificate files
27
SSL_CERT_LOCK = threading.RLock()
1✔
28

29
# markers that indicate the start/end of sections in PEM cert files
30
PEM_CERT_START = "-----BEGIN CERTIFICATE-----"
1✔
31
PEM_CERT_END = "-----END CERTIFICATE-----"
1✔
32
PEM_KEY_START_REGEX = r"-----BEGIN(.*)PRIVATE KEY-----"
1✔
33
PEM_KEY_END_REGEX = r"-----END(.*)PRIVATE KEY-----"
1✔
34

35
OID_AES256_CBC = "2.16.840.1.101.3.4.1.42"
1✔
36
OID_MGF1 = "1.2.840.113549.1.1.8"
1✔
37
OID_RSAES_OAEP = "1.2.840.113549.1.1.7"
1✔
38
OID_SHA256 = "2.16.840.1.101.3.4.2.1"
1✔
39

40

41
@synchronized(lock=SSL_CERT_LOCK)
1✔
42
def generate_ssl_cert(
1✔
43
    target_file=None,
44
    overwrite=False,
45
    random=False,
46
    return_content=False,
47
    serial_number=None,
48
):
49
    # Note: Do NOT import "OpenSSL" at the root scope
50
    # (Our test Lambdas are importing this file but don't have the module installed)
51
    from OpenSSL import crypto
1✔
52

53
    def all_exist(*files):
1✔
54
        return all(os.path.exists(f) for f in files)
1✔
55

56
    def store_cert_key_files(base_filename):
1✔
57
        key_file_name = f"{base_filename}.key"
1✔
58
        cert_file_name = f"{base_filename}.crt"
1✔
59
        # TODO: Cleaner code to load the cert dynamically
60
        # extract key and cert from target_file and store into separate files
61
        content = load_file(target_file)
1✔
62
        key_start = re.search(PEM_KEY_START_REGEX, content)
1✔
63
        key_start = key_start.group(0)
1✔
64
        key_end = re.search(PEM_KEY_END_REGEX, content)
1✔
65
        key_end = key_end.group(0)
1✔
66
        key_content = content[content.index(key_start) : content.index(key_end) + len(key_end)]
1✔
67
        cert_content = content[
1✔
68
            content.index(PEM_CERT_START) : content.rindex(PEM_CERT_END) + len(PEM_CERT_END)
69
        ]
70
        save_file(key_file_name, key_content)
1✔
71
        save_file(cert_file_name, cert_content)
1✔
72
        return cert_file_name, key_file_name
1✔
73

74
    if target_file and not overwrite and file_exists_not_empty(target_file):
1✔
75
        try:
1✔
76
            cert_file_name, key_file_name = store_cert_key_files(target_file)
1✔
77
        except Exception as e:
×
78
            # fall back to temporary files if we cannot store/overwrite the files above
79
            LOG.info(
×
80
                "Error storing key/cert SSL files (falling back to random tmp file names): %s", e
81
            )
82
            target_file_tmp = new_tmp_file()
×
83
            cert_file_name, key_file_name = store_cert_key_files(target_file_tmp)
×
84
        if all_exist(cert_file_name, key_file_name):
1✔
85
            return target_file, cert_file_name, key_file_name
1✔
86
    if random and target_file:
1✔
87
        if "." in target_file:
×
88
            target_file = target_file.replace(".", f".{short_uid()}.", 1)
×
89
        else:
90
            target_file = f"{target_file}.{short_uid()}"
×
91

92
    # create a key pair
93
    k = crypto.PKey()
1✔
94
    k.generate_key(crypto.TYPE_RSA, 2048)
1✔
95

96
    host_definition = localstack_host()
1✔
97

98
    # create a self-signed cert
99
    cert = crypto.X509()
1✔
100
    subj = cert.get_subject()
1✔
101
    subj.C = "AU"
1✔
102
    subj.ST = "Some-State"
1✔
103
    subj.L = "Some-Locality"
1✔
104
    subj.O = "LocalStack Org"  # noqa
1✔
105
    subj.OU = "Testing"
1✔
106
    subj.CN = "localhost"
1✔
107
    # Note: new requirements for recent OSX versions: https://support.apple.com/en-us/HT210176
108
    # More details: https://www.iol.unh.edu/blog/2019/10/10/macos-catalina-and-chrome-trust
109
    serial_number = serial_number or 1001
1✔
110
    cert.set_version(2)
1✔
111
    cert.set_serial_number(serial_number)
1✔
112
    cert.gmtime_adj_notBefore(0)
1✔
113
    cert.gmtime_adj_notAfter(2 * 365 * 24 * 60 * 60)
1✔
114
    cert.set_issuer(cert.get_subject())
1✔
115
    cert.set_pubkey(k)
1✔
116
    alt_names = (
1✔
117
        f"DNS:localhost,DNS:test.localhost.atlassian.io,DNS:localhost.localstack.cloud,DNS:{host_definition.host}IP:127.0.0.1"
118
    ).encode()
119
    cert.add_extensions(
1✔
120
        [
121
            crypto.X509Extension(b"subjectAltName", False, alt_names),
122
            crypto.X509Extension(b"basicConstraints", True, b"CA:false"),
123
            crypto.X509Extension(
124
                b"keyUsage", True, b"nonRepudiation,digitalSignature,keyEncipherment"
125
            ),
126
            crypto.X509Extension(b"extendedKeyUsage", True, b"serverAuth"),
127
        ]
128
    )
129
    cert.sign(k, "SHA256")
1✔
130

131
    cert_file = io.StringIO()
1✔
132
    key_file = io.StringIO()
1✔
133
    cert_file.write(to_str(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)))
1✔
134
    key_file.write(to_str(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)))
1✔
135
    cert_file_content = cert_file.getvalue().strip()
1✔
136
    key_file_content = key_file.getvalue().strip()
1✔
137
    file_content = f"{key_file_content}\n{cert_file_content}"
1✔
138
    if target_file:
1✔
139
        key_file_name = f"{target_file}.key"
1✔
140
        cert_file_name = f"{target_file}.crt"
1✔
141
        # check existence to avoid permission denied issues:
142
        # https://github.com/localstack/localstack/issues/1607
143
        if not all_exist(target_file, key_file_name, cert_file_name):
1✔
144
            for i in range(2):
1✔
145
                try:
1✔
146
                    save_file(target_file, file_content)
1✔
147
                    save_file(key_file_name, key_file_content)
1✔
148
                    save_file(cert_file_name, cert_file_content)
1✔
149
                    break
1✔
150
                except Exception as e:
×
151
                    if i > 0:
×
152
                        raise
×
153
                    LOG.info(
×
154
                        "Unable to store certificate file under %s, using tmp file instead: %s",
155
                        target_file,
156
                        e,
157
                    )
158
                    # Fix for https://github.com/localstack/localstack/issues/1743
159
                    target_file = f"{new_tmp_file()}.pem"
×
160
                    key_file_name = f"{target_file}.key"
×
161
                    cert_file_name = f"{target_file}.crt"
×
162
            TMP_FILES.append(target_file)
1✔
163
            TMP_FILES.append(key_file_name)
1✔
164
            TMP_FILES.append(cert_file_name)
1✔
165
        if not return_content:
1✔
166
            return target_file, cert_file_name, key_file_name
1✔
167
    return file_content
1✔
168

169

170
def pad(s: bytes) -> bytes:
1✔
171
    return s + to_bytes((BLOCK_SIZE - len(s) % BLOCK_SIZE) * chr(BLOCK_SIZE - len(s) % BLOCK_SIZE))
1✔
172

173

174
def unpad(s: bytes) -> bytes:
1✔
175
    return s[0 : -s[-1]]
1✔
176

177

178
def encrypt(key: bytes, message: bytes, iv: bytes = None, aad: bytes = None) -> tuple[bytes, bytes]:
1✔
179
    iv = iv or b"0" * BLOCK_SIZE
1✔
180
    cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
1✔
181
    encryptor = cipher.encryptor()
1✔
182
    encryptor.authenticate_additional_data(aad)
1✔
183
    encrypted = encryptor.update(pad(message)) + encryptor.finalize()
1✔
184
    return encrypted, encryptor.tag
1✔
185

186

187
def decrypt(
1✔
188
    key: bytes, encrypted: bytes, iv: bytes = None, tag: bytes = None, aad: bytes = None
189
) -> bytes:
190
    iv = iv or b"0" * BLOCK_SIZE
1✔
191
    cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=default_backend())
1✔
192
    decryptor = cipher.decryptor()
1✔
193
    decryptor.authenticate_additional_data(aad)
1✔
194
    decrypted = decryptor.update(encrypted) + decryptor.finalize()
1✔
195
    decrypted = unpad(decrypted)
1✔
196
    return decrypted
1✔
197

198

199
def pkcs7_envelope_encrypt(plaintext: bytes, recipient_pubkey: RSAPublicKey) -> bytes:
1✔
200
    """
201
    Create a PKCS7 wrapper of some plaintext decryptable by recipient_pubkey.  Uses RSA-OAEP with SHA-256
202
    to encrypt the AES-256-CBC content key.  Hazmat's PKCS7EnvelopeBuilder doesn't support RSA-OAEP with SHA-256,
203
    so we need to build the pieces manually and then put them together in an envelope with asn1crypto.
204
    """
205

206
    # Encrypt the plaintext with an AES session key, then encrypt the session key to the recipient_pubkey
207
    session_key = os.urandom(32)
1✔
208
    iv = os.urandom(16)
1✔
209
    encrypted_session_key = recipient_pubkey.encrypt(
1✔
210
        session_key,
211
        padding.OAEP(
212
            mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None
213
        ),
214
    )
215
    cipher = Cipher(algorithms.AES(session_key), modes.CBC(iv), backend=default_backend())
1✔
216
    encryptor = cipher.encryptor()
1✔
217
    padder = sym_padding.PKCS7(algorithms.AES.block_size).padder()
1✔
218
    padded_plaintext = padder.update(plaintext) + padder.finalize()
1✔
219
    encrypted_content = encryptor.update(padded_plaintext) + encryptor.finalize()
1✔
220

221
    # Now put together the envelope.
222
    # Add the recipient with their copy of the session key
223
    recipient_identifier = cms.RecipientIdentifier(
1✔
224
        name="issuer_and_serial_number",
225
        value=cms.IssuerAndSerialNumber(
226
            {
227
                "issuer": asn1_x509.Name.build({"common_name": "recipient"}),
228
                "serial_number": 1,
229
            }
230
        ),
231
    )
232
    key_enc_algorithm = cms.KeyEncryptionAlgorithm(
1✔
233
        {
234
            "algorithm": OID_RSAES_OAEP,
235
            "parameters": algos.RSAESOAEPParams(
236
                {
237
                    "hash_algorithm": algos.DigestAlgorithm(
238
                        {
239
                            "algorithm": OID_SHA256,
240
                        }
241
                    ),
242
                    "mask_gen_algorithm": algos.MaskGenAlgorithm(
243
                        {
244
                            "algorithm": OID_MGF1,
245
                            "parameters": algos.DigestAlgorithm(
246
                                {
247
                                    "algorithm": OID_SHA256,
248
                                }
249
                            ),
250
                        }
251
                    ),
252
                }
253
            ),
254
        }
255
    )
256
    recipient_info = cms.KeyTransRecipientInfo(
1✔
257
        {
258
            "version": "v0",
259
            "rid": recipient_identifier,
260
            "key_encryption_algorithm": key_enc_algorithm,
261
            "encrypted_key": encrypted_session_key,
262
        }
263
    )
264

265
    # Add the encrypted content
266
    content_enc_algorithm = cms.EncryptionAlgorithm(
1✔
267
        {
268
            "algorithm": OID_AES256_CBC,
269
            "parameters": core.OctetString(iv),
270
        }
271
    )
272
    encrypted_content_info = cms.EncryptedContentInfo(
1✔
273
        {
274
            "content_type": "data",
275
            "content_encryption_algorithm": content_enc_algorithm,
276
            "encrypted_content": encrypted_content,
277
        }
278
    )
279
    enveloped_data = cms.EnvelopedData(
1✔
280
        {
281
            "version": "v0",
282
            "recipient_infos": [recipient_info],
283
            "encrypted_content_info": encrypted_content_info,
284
        }
285
    )
286

287
    # Finally add a wrapper and return its bytes
288
    content_info = cms.ContentInfo(
1✔
289
        {
290
            "content_type": "enveloped_data",
291
            "content": enveloped_data,
292
        }
293
    )
294
    return content_info.dump()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc