• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.24
/localstack-core/localstack/utils/crypto.py
1
import io
1✔
2
import logging
1✔
3
import os
1✔
4
import re
1✔
5
import threading
1✔
6

7
from cryptography.hazmat.backends import default_backend
1✔
8
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
1✔
9

10
from .files import TMP_FILES, file_exists_not_empty, load_file, new_tmp_file, save_file
1✔
11
from .strings import short_uid, to_bytes, to_str
1✔
12
from .sync import synchronized
1✔
13
from .urls import localstack_host
1✔
14

15
LOG = logging.getLogger(__name__)
1✔
16

17
# block size for symmetric encrypt/decrypt operations
18
BLOCK_SIZE = 16
1✔
19

20
# lock for creating certificate files
21
SSL_CERT_LOCK = threading.RLock()
1✔
22

23
# markers that indicate the start/end of sections in PEM cert files
24
PEM_CERT_START = "-----BEGIN CERTIFICATE-----"
1✔
25
PEM_CERT_END = "-----END CERTIFICATE-----"
1✔
26
PEM_KEY_START_REGEX = r"-----BEGIN(.*)PRIVATE KEY-----"
1✔
27
PEM_KEY_END_REGEX = r"-----END(.*)PRIVATE KEY-----"
1✔
28

29

30
@synchronized(lock=SSL_CERT_LOCK)
1✔
31
def generate_ssl_cert(
1✔
32
    target_file=None,
33
    overwrite=False,
34
    random=False,
35
    return_content=False,
36
    serial_number=None,
37
):
38
    # Note: Do NOT import "OpenSSL" at the root scope
39
    # (Our test Lambdas are importing this file but don't have the module installed)
40
    from OpenSSL import crypto
1✔
41

42
    def all_exist(*files):
1✔
43
        return all(os.path.exists(f) for f in files)
1✔
44

45
    def store_cert_key_files(base_filename):
1✔
46
        key_file_name = "%s.key" % base_filename
1✔
47
        cert_file_name = "%s.crt" % base_filename
1✔
48
        # TODO: Cleaner code to load the cert dynamically
49
        # extract key and cert from target_file and store into separate files
50
        content = load_file(target_file)
1✔
51
        key_start = re.search(PEM_KEY_START_REGEX, content)
1✔
52
        key_start = key_start.group(0)
1✔
53
        key_end = re.search(PEM_KEY_END_REGEX, content)
1✔
54
        key_end = key_end.group(0)
1✔
55
        key_content = content[content.index(key_start) : content.index(key_end) + len(key_end)]
1✔
56
        cert_content = content[
1✔
57
            content.index(PEM_CERT_START) : content.rindex(PEM_CERT_END) + len(PEM_CERT_END)
58
        ]
59
        save_file(key_file_name, key_content)
1✔
60
        save_file(cert_file_name, cert_content)
1✔
61
        return cert_file_name, key_file_name
1✔
62

63
    if target_file and not overwrite and file_exists_not_empty(target_file):
1✔
64
        try:
1✔
65
            cert_file_name, key_file_name = store_cert_key_files(target_file)
1✔
UNCOV
66
        except Exception as e:
×
67
            # fall back to temporary files if we cannot store/overwrite the files above
UNCOV
68
            LOG.info(
×
69
                "Error storing key/cert SSL files (falling back to random tmp file names): %s", e
70
            )
UNCOV
71
            target_file_tmp = new_tmp_file()
×
72
            cert_file_name, key_file_name = store_cert_key_files(target_file_tmp)
×
73
        if all_exist(cert_file_name, key_file_name):
1✔
74
            return target_file, cert_file_name, key_file_name
1✔
75
    if random and target_file:
1✔
UNCOV
76
        if "." in target_file:
×
77
            target_file = target_file.replace(".", ".%s." % short_uid(), 1)
×
78
        else:
UNCOV
79
            target_file = "%s.%s" % (target_file, short_uid())
×
80

81
    # create a key pair
82
    k = crypto.PKey()
1✔
83
    k.generate_key(crypto.TYPE_RSA, 2048)
1✔
84

85
    host_definition = localstack_host()
1✔
86

87
    # create a self-signed cert
88
    cert = crypto.X509()
1✔
89
    subj = cert.get_subject()
1✔
90
    subj.C = "AU"
1✔
91
    subj.ST = "Some-State"
1✔
92
    subj.L = "Some-Locality"
1✔
93
    subj.O = "LocalStack Org"  # noqa
1✔
94
    subj.OU = "Testing"
1✔
95
    subj.CN = "localhost"
1✔
96
    # Note: new requirements for recent OSX versions: https://support.apple.com/en-us/HT210176
97
    # More details: https://www.iol.unh.edu/blog/2019/10/10/macos-catalina-and-chrome-trust
98
    serial_number = serial_number or 1001
1✔
99
    cert.set_version(2)
1✔
100
    cert.set_serial_number(serial_number)
1✔
101
    cert.gmtime_adj_notBefore(0)
1✔
102
    cert.gmtime_adj_notAfter(2 * 365 * 24 * 60 * 60)
1✔
103
    cert.set_issuer(cert.get_subject())
1✔
104
    cert.set_pubkey(k)
1✔
105
    alt_names = (
1✔
106
        f"DNS:localhost,DNS:test.localhost.atlassian.io,DNS:localhost.localstack.cloud,DNS:{host_definition.host}IP:127.0.0.1"
107
    ).encode()
108
    cert.add_extensions(
1✔
109
        [
110
            crypto.X509Extension(b"subjectAltName", False, alt_names),
111
            crypto.X509Extension(b"basicConstraints", True, b"CA:false"),
112
            crypto.X509Extension(
113
                b"keyUsage", True, b"nonRepudiation,digitalSignature,keyEncipherment"
114
            ),
115
            crypto.X509Extension(b"extendedKeyUsage", True, b"serverAuth"),
116
        ]
117
    )
118
    cert.sign(k, "SHA256")
1✔
119

120
    cert_file = io.StringIO()
1✔
121
    key_file = io.StringIO()
1✔
122
    cert_file.write(to_str(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)))
1✔
123
    key_file.write(to_str(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)))
1✔
124
    cert_file_content = cert_file.getvalue().strip()
1✔
125
    key_file_content = key_file.getvalue().strip()
1✔
126
    file_content = "%s\n%s" % (key_file_content, cert_file_content)
1✔
127
    if target_file:
1✔
128
        key_file_name = "%s.key" % target_file
1✔
129
        cert_file_name = "%s.crt" % target_file
1✔
130
        # check existence to avoid permission denied issues:
131
        # https://github.com/localstack/localstack/issues/1607
132
        if not all_exist(target_file, key_file_name, cert_file_name):
1✔
133
            for i in range(2):
1✔
134
                try:
1✔
135
                    save_file(target_file, file_content)
1✔
136
                    save_file(key_file_name, key_file_content)
1✔
137
                    save_file(cert_file_name, cert_file_content)
1✔
138
                    break
1✔
UNCOV
139
                except Exception as e:
×
140
                    if i > 0:
×
141
                        raise
×
142
                    LOG.info(
×
143
                        "Unable to store certificate file under %s, using tmp file instead: %s",
144
                        target_file,
145
                        e,
146
                    )
147
                    # Fix for https://github.com/localstack/localstack/issues/1743
UNCOV
148
                    target_file = "%s.pem" % new_tmp_file()
×
149
                    key_file_name = "%s.key" % target_file
×
150
                    cert_file_name = "%s.crt" % target_file
×
151
            TMP_FILES.append(target_file)
1✔
152
            TMP_FILES.append(key_file_name)
1✔
153
            TMP_FILES.append(cert_file_name)
1✔
154
        if not return_content:
1✔
155
            return target_file, cert_file_name, key_file_name
1✔
156
    return file_content
1✔
157

158

159
def pad(s: bytes) -> bytes:
1✔
160
    return s + to_bytes((BLOCK_SIZE - len(s) % BLOCK_SIZE) * chr(BLOCK_SIZE - len(s) % BLOCK_SIZE))
1✔
161

162

163
def unpad(s: bytes) -> bytes:
1✔
164
    return s[0 : -s[-1]]
1✔
165

166

167
def encrypt(key: bytes, message: bytes, iv: bytes = None, aad: bytes = None) -> tuple[bytes, bytes]:
1✔
168
    iv = iv or b"0" * BLOCK_SIZE
1✔
169
    cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
1✔
170
    encryptor = cipher.encryptor()
1✔
171
    encryptor.authenticate_additional_data(aad)
1✔
172
    encrypted = encryptor.update(pad(message)) + encryptor.finalize()
1✔
173
    return encrypted, encryptor.tag
1✔
174

175

176
def decrypt(
1✔
177
    key: bytes, encrypted: bytes, iv: bytes = None, tag: bytes = None, aad: bytes = None
178
) -> bytes:
179
    iv = iv or b"0" * BLOCK_SIZE
1✔
180
    cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=default_backend())
1✔
181
    decryptor = cipher.decryptor()
1✔
182
    decryptor.authenticate_additional_data(aad)
1✔
183
    decrypted = decryptor.update(encrypted) + decryptor.finalize()
1✔
184
    decrypted = unpad(decrypted)
1✔
185
    return decrypted
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc