• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zappa / Zappa / 15524875889

09 Jun 2025 01:38AM UTC coverage: 75.123% (+0.3%) from 74.862%
15524875889

Pull #1384

github

web-flow
Merge 7865a6116 into c2a8e2077
Pull Request #1384: :fire: :wrench: Remove/depreciated pkg_resource usage and other depreciated usage

79 of 145 new or added lines in 3 files covered. (54.48%)

20 existing lines in 4 files now uncovered.

2890 of 3847 relevant lines covered (75.12%)

3.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.83
/zappa/letsencrypt.py
1
#!/usr/bin/env python
2
"""
1✔
3
Create and install a Let's Encrypt cert for an API Gateway.
4

5
This file is a descendant of @diafygi's 'acme-tiny',
6
with http-01 replaced with dns-01 via AWS Route 53.
7

8
You must generate your own account.key:
9
openssl genrsa 2048 > account.key # Keep it secret, keep safe!
10

11
"""
12

13
import atexit
5✔
14
import base64
5✔
15
import binascii
5✔
16
import copy
5✔
17
import hashlib
5✔
18
import json
5✔
19
import logging
5✔
20
import os
5✔
21
import re
5✔
22
import shutil
5✔
23
import subprocess
5✔
24
import tempfile
5✔
25
import textwrap
5✔
26
import time
5✔
27
from urllib.request import urlopen
5✔
28

29
import requests
5✔
30

31
# Staging
32
# Amazon doesn't accept these though.
33
# DEFAULT_CA = "https://acme-staging.api.letsencrypt.org"
34

35
# Production
36
DEFAULT_CA = "https://acme-v02.api.letsencrypt.org"
5✔
37

38
LOGGER = logging.getLogger(__name__)
5✔
39
LOGGER.addHandler(logging.StreamHandler())
5✔
40

41

42
def get_cert_and_update_domain(
5✔
43
    zappa_instance,
44
    lambda_name,
45
    api_stage,
46
    domain=None,
47
    manual=False,
48
):
49
    """
50
    Main cert installer path.
51
    """
52

53
    try:
5✔
54
        create_domain_key()
5✔
55
        create_domain_csr(domain)
5✔
56
        get_cert(zappa_instance)
×
57
        create_chained_certificate()
×
58

59
        with open("{}/signed.crt".format(gettempdir())) as f:
×
60
            certificate_body = f.read()
×
61

62
        with open("{}/domain.key".format(gettempdir())) as f:
×
63
            certificate_private_key = f.read()
×
64

65
        with open("{}/intermediate.pem".format(gettempdir())) as f:
×
66
            certificate_chain = f.read()
×
67

68
        if not manual:
×
69
            if domain:
×
70
                if not zappa_instance.get_domain_name(domain):
×
71
                    zappa_instance.create_domain_name(
×
72
                        domain_name=domain,
73
                        certificate_name=domain + "-Zappa-LE-Cert",
74
                        certificate_body=certificate_body,
75
                        certificate_private_key=certificate_private_key,
76
                        certificate_chain=certificate_chain,
77
                        certificate_arn=None,
78
                        lambda_name=lambda_name,
79
                        stage=api_stage,
80
                    )
81
                    print(
×
82
                        "Created a new domain name. "
83
                        "Please note that it can take up to 40 minutes "
84
                        "for this domain to be created and propagated through AWS, "
85
                        "but it requires no further work on your part."
86
                    )
87
                else:
88
                    zappa_instance.update_domain_name(
×
89
                        domain_name=domain,
90
                        certificate_name=domain + "-Zappa-LE-Cert",
91
                        certificate_body=certificate_body,
92
                        certificate_private_key=certificate_private_key,
93
                        certificate_chain=certificate_chain,
94
                        certificate_arn=None,
95
                        lambda_name=lambda_name,
96
                        stage=api_stage,
97
                    )
98
        else:
99
            print("Cerificate body:\n")
×
100
            print(certificate_body)
×
101

102
            print("\nCerificate private key:\n")
×
103
            print(certificate_private_key)
×
104

105
            print("\nCerificate chain:\n")
×
106
            print(certificate_chain)
×
107

108
    except Exception as e:
5✔
109
        print(e)
5✔
110
        return False
5✔
111

112
    return True
×
113

114

115
def create_domain_key():
5✔
116
    devnull = open(os.devnull, "wb")
5✔
117
    out = subprocess.check_output(["openssl", "genrsa", "2048"], stderr=devnull)
5✔
118
    with open(os.path.join(gettempdir(), "domain.key"), "wb") as f:
5✔
119
        f.write(out)
5✔
120

121

122
def create_domain_csr(domain):
5✔
123
    subj = "/CN=" + domain
5✔
124
    cmd = [
5✔
125
        "openssl",
126
        "req",
127
        "-new",
128
        "-sha256",
129
        "-key",
130
        os.path.join(gettempdir(), "domain.key"),
131
        "-subj",
132
        subj,
133
    ]
134

135
    devnull = open(os.devnull, "wb")
5✔
136
    out = subprocess.check_output(cmd, stderr=devnull)
5✔
137
    with open(os.path.join(gettempdir(), "domain.csr"), "wb") as f:
5✔
138
        f.write(out)
5✔
139

140

141
def create_chained_certificate():
5✔
142
    signed_crt = open(os.path.join(gettempdir(), "signed.crt"), "rb").read()
5✔
143

144
    cross_cert_url = "https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem"
5✔
145
    cert = requests.get(cross_cert_url)
5✔
146
    with open(os.path.join(gettempdir(), "intermediate.pem"), "wb") as intermediate_pem:
5✔
147
        intermediate_pem.write(cert.content)
5✔
148

149
    with open(os.path.join(gettempdir(), "chained.pem"), "wb") as chained_pem:
5✔
150
        chained_pem.write(signed_crt)
5✔
151
        chained_pem.write(cert.content)
5✔
152

153

154
def parse_account_key():
5✔
155
    """Parse account key to get public key"""
156
    LOGGER.info("Parsing account key...")
5✔
157
    cmd = [
5✔
158
        "openssl",
159
        "rsa",
160
        "-in",
161
        os.path.join(gettempdir(), "account.key"),
162
        "-noout",
163
        "-text",
164
    ]
165
    devnull = open(os.devnull, "wb")
5✔
166
    return subprocess.check_output(cmd, stderr=devnull)
5✔
167

168

169
def parse_csr():
5✔
170
    """
171
    Parse certificate signing request for domains
172
    """
173
    LOGGER.info("Parsing CSR...")
5✔
174
    cmd = [
5✔
175
        "openssl",
176
        "req",
177
        "-in",
178
        os.path.join(gettempdir(), "domain.csr"),
179
        "-noout",
180
        "-text",
181
    ]
182
    devnull = open(os.devnull, "wb")
5✔
183
    out = subprocess.check_output(cmd, stderr=devnull)
5✔
184
    domains = set([])
5✔
185
    common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode("utf8"))
5✔
186
    if common_name is not None:
5✔
187
        domains.add(common_name.group(1))
5✔
188
    subject_alt_names = re.search(
5✔
189
        r"X509v3 Subject Alternative Name: \n +([^\n]+)\n",
190
        out.decode("utf8"),
191
        re.MULTILINE | re.DOTALL,
192
    )
193
    if subject_alt_names is not None:
5✔
194
        for san in subject_alt_names.group(1).split(", "):
×
195
            if san.startswith("DNS:"):
×
196
                domains.add(san[4:])
×
197

198
    return domains
5✔
199

200

201
def get_boulder_header(key_bytes):
5✔
202
    """
203
    Use regular expressions to find crypto values from parsed account key,
204
    and return a header we can send to our Boulder instance.
205
    """
206
    pub_hex, pub_exp = re.search(
5✔
207
        r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
208
        key_bytes.decode("utf8"),
209
        re.MULTILINE | re.DOTALL,
210
    ).groups()
211
    pub_exp = "{0:x}".format(int(pub_exp))
5✔
212
    pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
5✔
213
    header = {
5✔
214
        "alg": "RS256",
215
        "jwk": {
216
            "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
217
            "kty": "RSA",
218
            "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
219
        },
220
    }
221

222
    return header
5✔
223

224

225
def register_account():
5✔
226
    """
227
    Agree to LE TOS
228
    """
229
    LOGGER.info("Registering account...")
5✔
230
    code, result = _send_signed_request(
5✔
231
        DEFAULT_CA + "/acme/new-reg",
232
        {
233
            "resource": "new-reg",
234
            "agreement": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf",
235
        },
236
    )
237
    if code == 201:  # pragma: no cover
238
        LOGGER.info("Registered!")
239
    elif code == 409:  # pragma: no cover
240
        LOGGER.info("Already registered!")
241
    else:  # pragma: no cover
242
        raise ValueError("Error registering: {0} {1}".format(code, result))
243

244

245
def get_cert(zappa_instance, log=LOGGER, CA=DEFAULT_CA):
5✔
246
    """
247
    Call LE to get a new signed CA.
248
    """
249
    out = parse_account_key()
×
250
    header = get_boulder_header(out)
×
251
    accountkey_json = json.dumps(header["jwk"], sort_keys=True, separators=(",", ":"))
×
252
    thumbprint = _b64(hashlib.sha256(accountkey_json.encode("utf8")).digest())
×
253

254
    # find domains
255
    domains = parse_csr()
×
256

257
    # get the certificate domains and expiration
258
    register_account()
×
259

260
    # verify each domain
261
    for domain in domains:
×
262
        log.info("Verifying {0}...".format(domain))
×
263

264
        # get new challenge
265
        code, result = _send_signed_request(
×
266
            CA + "/acme/new-authz",
267
            {
268
                "resource": "new-authz",
269
                "identifier": {"type": "dns", "value": domain},
270
            },
271
        )
272
        if code != 201:
×
273
            raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
×
274

275
        challenge = [ch for ch in json.loads(result.decode("utf8"))["challenges"] if ch["type"] == "dns-01"][0]
×
276
        token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"])
×
277
        keyauthorization = "{0}.{1}".format(token, thumbprint).encode("utf-8")
×
278

279
        # sha256_b64
280
        digest = _b64(hashlib.sha256(keyauthorization).digest())
×
281

282
        zone_id = zappa_instance.get_hosted_zone_id_for_domain(domain)
×
283
        if not zone_id:
×
284
            raise ValueError("Could not find Zone ID for: " + domain)
×
285
        zappa_instance.set_dns_challenge_txt(zone_id, domain, digest)  # resp is unused
×
286

287
        print("Waiting for DNS to propagate..")
×
288

289
        # What's optimal here?
290
        # import time  # double import; import in loop; shadowed import
291
        time.sleep(45)
×
292

293
        # notify challenge are met
294
        code, result = _send_signed_request(
×
295
            challenge["uri"],
296
            {
297
                "resource": "challenge",
298
                "keyAuthorization": keyauthorization.decode("utf-8"),
299
            },
300
        )
301
        if code != 202:
×
302
            raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
×
303

304
        # wait for challenge to be verified
305
        verify_challenge(challenge["uri"])
×
306

307
        # Challenge verified, clean up R53
308
        zappa_instance.remove_dns_challenge_txt(zone_id, domain, digest)
×
309

310
    # Sign
311
    result = sign_certificate()
×
312
    # Encode to PEM format
313
    encode_certificate(result)
×
314

315
    return True
×
316

317

318
def verify_challenge(uri):
5✔
319
    """
320
    Loop until our challenge is verified, else fail.
321
    """
UNCOV
322
    while True:
×
323
        try:
×
324
            resp = urlopen(uri)
×
325
            challenge_status = json.loads(resp.read().decode("utf8"))
×
326
        except IOError as e:
×
327
            raise ValueError("Error checking challenge: {0} {1}".format(e.code, json.loads(e.read().decode("utf8"))))
×
328
        if challenge_status["status"] == "pending":
×
329
            time.sleep(2)
×
330
        elif challenge_status["status"] == "valid":
×
331
            LOGGER.info("Domain verified!")
×
332
            break
×
333
        else:
334
            raise ValueError("Domain challenge did not pass: {0}".format(challenge_status))
×
335

336

337
def sign_certificate():
5✔
338
    """
339
    Get the new certificate.
340
    Returns the signed bytes.
341

342
    """
343
    LOGGER.info("Signing certificate...")
5✔
344
    cmd = [
5✔
345
        "openssl",
346
        "req",
347
        "-in",
348
        os.path.join(gettempdir(), "domain.csr"),
349
        "-outform",
350
        "DER",
351
    ]
352
    devnull = open(os.devnull, "wb")
5✔
353
    csr_der = subprocess.check_output(cmd, stderr=devnull)
5✔
354
    code, result = _send_signed_request(
5✔
355
        DEFAULT_CA + "/acme/new-cert",
356
        {
357
            "resource": "new-cert",
358
            "csr": _b64(csr_der),
359
        },
360
    )
361
    if code != 201:
5✔
362
        raise ValueError("Error signing certificate: {0} {1}".format(code, result))
5✔
363
    LOGGER.info("Certificate signed!")
×
364

365
    return result
×
366

367

368
def encode_certificate(result):
5✔
369
    """
370
    Encode cert bytes to PEM encoded cert file.
371
    """
372
    cert_body = """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
5✔
373
        "\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
374
    )
375
    signed_crt = open("{}/signed.crt".format(gettempdir()), "w")
5✔
376
    signed_crt.write(cert_body)
5✔
377
    signed_crt.close()
5✔
378

379
    return True
5✔
380

381

382
##
383
# Request Utility
384
##
385

386

387
def _b64(b):
5✔
388
    """
389
    Helper function base64 encode for jose spec
390
    """
391
    return base64.urlsafe_b64encode(b).decode("utf8").replace("=", "")
5✔
392

393

394
def _send_signed_request(url, payload):
5✔
395
    """
396
    Helper function to make signed requests to Boulder
397
    """
398
    payload64 = _b64(json.dumps(payload).encode("utf8"))
5✔
399

400
    out = parse_account_key()
5✔
401
    header = get_boulder_header(out)
5✔
402

403
    protected = copy.deepcopy(header)
5✔
404
    protected["nonce"] = urlopen(DEFAULT_CA + "/directory").headers["Replay-Nonce"]
5✔
405
    protected64 = _b64(json.dumps(protected).encode("utf8"))
5✔
406
    cmd = [
5✔
407
        "openssl",
408
        "dgst",
409
        "-sha256",
410
        "-sign",
411
        os.path.join(gettempdir(), "account.key"),
412
    ]
413
    proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
5✔
414
    out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode("utf8"))
5✔
415
    if proc.returncode != 0:  # pragma: no cover
416
        raise IOError("OpenSSL Error: {0}".format(err))
417
    data = json.dumps(
5✔
418
        {
419
            "header": header,
420
            "protected": protected64,
421
            "payload": payload64,
422
            "signature": _b64(out),
423
        }
424
    )
425
    try:
5✔
426
        resp = urlopen(url, data.encode("utf8"))
5✔
427
        return resp.getcode(), resp.read()
×
428
    except IOError as e:
5✔
429
        return getattr(e, "code", None), getattr(e, "read", e.__str__)()
5✔
430

431

432
##
433
# Temporary Directory Utility
434
##
435

436

437
__tempdir = None
5✔
438

439

440
def gettempdir():
5✔
441
    """
442
    Lazily creates a temporary directory in a secure manner. When Python exits,
443
    or the cleanup() function is called, the directory is erased.
444
    """
445
    global __tempdir
446
    if __tempdir is not None:
5✔
447
        return __tempdir
5✔
448
    __tempdir = tempfile.mkdtemp()
5✔
449
    return __tempdir
5✔
450

451

452
@atexit.register
5✔
453
def cleanup():
5✔
454
    """
455
    Delete any temporary files.
456
    """
457
    global __tempdir
458
    if __tempdir is not None:
×
459
        shutil.rmtree(__tempdir)
×
460
        __tempdir = None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc