• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zappa / Zappa / 15524875889

09 Jun 2025 01:38AM UTC coverage: 75.123% (+0.3%) from 74.862%
15524875889

Pull #1384

github

web-flow
Merge 7865a6116 into c2a8e2077
Pull Request #1384: :fire: :wrench: Remove/depreciated pkg_resource usage and other depreciated usage

79 of 145 new or added lines in 3 files covered. (54.48%)

20 existing lines in 4 files now uncovered.

2890 of 3847 relevant lines covered (75.12%)

3.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.95
/zappa/cli.py
1
"""
2
Zappa CLI
3

4
Deploy arbitrary Python programs as serverless Zappa applications.
5

6
"""
7

8
import argparse
5✔
9
import base64
5✔
10
import collections
5✔
11
import importlib
5✔
12
import inspect
5✔
13
import os
5✔
14
import pkgutil
5✔
15
import random
5✔
16
import re
5✔
17
import string
5✔
18
import sys
5✔
19
import tempfile
5✔
20
import time
5✔
21
import zipfile
5✔
22
from builtins import bytes, input
5✔
23
from datetime import datetime, timedelta, timezone
5✔
24
from pathlib import Path
5✔
25
from typing import Optional
5✔
26

27
import argcomplete
5✔
28
import botocore
5✔
29
import click
5✔
30
import hjson as json
5✔
31
import requests
5✔
32
import slugify
5✔
33
import toml
5✔
34
import yaml
5✔
35
from click import Command, Context
5✔
36
from click.exceptions import ClickException
5✔
37
from click.globals import push_context
5✔
38
from dateutil import parser
5✔
39

40
from . import __version__
5✔
41
from .core import API_GATEWAY_REGIONS, Zappa
5✔
42
from .utilities import (
5✔
43
    check_new_version_available,
44
    detect_django_settings,
45
    detect_flask_apps,
46
    get_runtime_from_python_version,
47
    get_venv_from_python_version,
48
    human_size,
49
    is_valid_bucket_name,
50
    parse_s3_url,
51
    string_to_timestamp,
52
    validate_name,
53
)
54

55
CUSTOM_SETTINGS = [
5✔
56
    "apigateway_policy",
57
    "assume_policy",
58
    "attach_policy",
59
    "aws_region",
60
    "delete_local_zip",
61
    "delete_s3_zip",
62
    "exclude",
63
    "exclude_glob",
64
    "extra_permissions",
65
    "include",
66
    "role_name",
67
    "touch",
68
]
69

70
BOTO3_CONFIG_DOCS_URL = "https://boto3.readthedocs.io/en/latest/guide/quickstart.html#configuration"
5✔
71

72

73
##
74
# Main Input Processing
75
##
76

77

78
class ZappaCLI:
5✔
79
    """
80
    ZappaCLI object is responsible for loading the settings,
81
    handling the input arguments and executing the calls to the core library.
82
    """
83

84
    # CLI
85
    vargs = None
5✔
86
    command = None
5✔
87
    stage_env = None
5✔
88

89
    # Zappa settings
90
    zappa = None
5✔
91
    zappa_settings = None
5✔
92
    load_credentials = True
5✔
93
    disable_progress = False
5✔
94

95
    # Specific settings
96
    api_stage = None
5✔
97
    app_function = None
5✔
98
    aws_region = None
5✔
99
    debug = None
5✔
100
    prebuild_script = None
5✔
101
    project_name = None
5✔
102
    profile_name = None
5✔
103
    lambda_arn = None
5✔
104
    lambda_name = None
5✔
105
    lambda_description = None
5✔
106
    lambda_concurrency = None
5✔
107
    s3_bucket_name = None
5✔
108
    settings_file = None
5✔
109
    zip_path = None
5✔
110
    handler_path = None
5✔
111
    vpc_config = None
5✔
112
    memory_size = None
5✔
113
    ephemeral_storage = None
5✔
114
    use_apigateway = None
5✔
115
    lambda_handler = None
5✔
116
    django_settings = None
5✔
117
    manage_roles = True
5✔
118
    exception_handler = None
5✔
119
    environment_variables = None
5✔
120
    authorizer = None
5✔
121
    xray_tracing = False
5✔
122
    aws_kms_key_arn = ""
5✔
123
    snap_start = None
5✔
124
    context_header_mappings = None
5✔
125
    additional_text_mimetypes = None
5✔
126
    tags = []  # type: ignore[var-annotated]
5✔
127
    layers = None
5✔
128
    architecture = None
5✔
129

130
    stage_name_env_pattern = re.compile("^[a-zA-Z0-9_]+$")
5✔
131

132
    def __init__(self):
5✔
133
        self._stage_config_overrides = {}  # change using self.override_stage_config_setting(key, val)
5✔
134

135
    @property
5✔
136
    def stage_config(self):
5✔
137
        """
138
        A shortcut property for settings of a stage.
139
        """
140

141
        def get_stage_setting(stage, extended_stages=None):
5✔
142
            if extended_stages is None:
5✔
143
                extended_stages = []
5✔
144

145
            if stage in extended_stages:
5✔
146
                raise RuntimeError(
5✔
147
                    stage + " has already been extended to these settings. "
148
                    "There is a circular extends within the settings file."
149
                )
150
            extended_stages.append(stage)
5✔
151

152
            try:
5✔
153
                stage_settings = dict(self.zappa_settings[stage].copy())
5✔
154
            except KeyError:
5✔
155
                raise ClickException("Cannot extend settings for undefined stage '" + stage + "'.")
5✔
156

157
            extends_stage = self.zappa_settings[stage].get("extends", None)
5✔
158
            if not extends_stage:
5✔
159
                return stage_settings
5✔
160
            extended_settings = get_stage_setting(stage=extends_stage, extended_stages=extended_stages)
5✔
161
            extended_settings.update(stage_settings)
5✔
162
            return extended_settings
5✔
163

164
        settings = get_stage_setting(stage=self.api_stage)
5✔
165

166
        # Backwards compatible for delete_zip setting that was more explicitly named delete_local_zip
167
        if "delete_zip" in settings:
5✔
168
            settings["delete_local_zip"] = settings.get("delete_zip")
×
169

170
        settings.update(self.stage_config_overrides)
5✔
171

172
        return settings
5✔
173

174
    @property
5✔
175
    def stage_config_overrides(self):
5✔
176
        """
177
        Returns zappa_settings we forcefully override for the current stage
178
        set by `self.override_stage_config_setting(key, value)`
179
        """
180
        return getattr(self, "_stage_config_overrides", {}).get(self.api_stage, {})
5✔
181

182
    def override_stage_config_setting(self, key, val):
5✔
183
        """
184
        Forcefully override a setting set by zappa_settings (for the current stage only)
185
        :param key: settings key
186
        :param val: value
187
        """
188
        self._stage_config_overrides = getattr(self, "_stage_config_overrides", {})
5✔
189
        self._stage_config_overrides.setdefault(self.api_stage, {})[key] = val
5✔
190

191
    def handle(self, argv=None):
5✔
192
        """
193
        Main function.
194
        Parses command, load settings and dispatches accordingly.
195
        """
196

197
        desc = "Zappa - Deploy Python applications to AWS Lambda" " and API Gateway.\n"
×
198
        parser = argparse.ArgumentParser(description=desc)
×
199
        parser.add_argument(
×
200
            "-v",
201
            "--version",
202
            action="version",
203
            version=__version__,
204
            help="Print the zappa version",
205
        )
206
        parser.add_argument("--color", default="auto", choices=["auto", "never", "always"])
×
207

208
        env_parser = argparse.ArgumentParser(add_help=False)
×
209
        me_group = env_parser.add_mutually_exclusive_group()
×
210
        all_help = "Execute this command for all of our defined " "Zappa stages."
×
211
        me_group.add_argument("--all", action="store_true", help=all_help)
×
212
        me_group.add_argument("stage_env", nargs="?")
×
213

214
        group = env_parser.add_argument_group()
×
215
        group.add_argument("-a", "--app_function", help="The WSGI application function.")
×
216
        group.add_argument("-s", "--settings_file", help="The path to a Zappa settings file.")
×
217
        group.add_argument("-q", "--quiet", action="store_true", help="Silence all output.")
×
218
        # https://github.com/Miserlou/Zappa/issues/407
219
        # Moved when 'template' command added.
220
        # Fuck Terraform.
221
        group.add_argument(
×
222
            "-j",
223
            "--json",
224
            action="store_true",
225
            help="Make the output of this command be machine readable.",
226
        )
227
        # https://github.com/Miserlou/Zappa/issues/891
228
        group.add_argument("--disable_progress", action="store_true", help="Disable progress bars.")
×
229
        group.add_argument("--no_venv", action="store_true", help="Skip venv check.")
×
230

231
        ##
232
        # Certify
233
        ##
234
        subparsers = parser.add_subparsers(title="subcommands", dest="command")
×
235
        cert_parser = subparsers.add_parser("certify", parents=[env_parser], help="Create and install SSL certificate")
×
236
        cert_parser.add_argument(
×
237
            "--manual",
238
            action="store_true",
239
            help=("Gets new Let's Encrypt certificates, but prints them to console." "Does not update API Gateway domains."),
240
        )
241
        cert_parser.add_argument("-y", "--yes", action="store_true", help="Auto confirm yes.")
×
242

243
        ##
244
        # Deploy
245
        ##
246
        deploy_parser = subparsers.add_parser("deploy", parents=[env_parser], help="Deploy application.")
×
247
        deploy_parser.add_argument(
×
248
            "-z",
249
            "--zip",
250
            help="Deploy Lambda with specific local or S3 hosted zip package",
251
        )
252
        deploy_parser.add_argument(
×
253
            "-d",
254
            "--docker-image-uri",
255
            help="Deploy Lambda with a specific docker image hosted in AWS Elastic Container Registry",
256
        )
257

258
        ##
259
        # Init
260
        ##
261
        subparsers.add_parser("init", help="Initialize Zappa app.")
×
262

263
        ##
264
        # Package
265
        ##
266
        package_parser = subparsers.add_parser(
×
267
            "package",
268
            parents=[env_parser],
269
            help="Build the application zip package locally.",
270
        )
271
        package_parser.add_argument("-o", "--output", help="Name of file to output the package to.")
×
272

273
        ##
274
        # Template
275
        ##
276
        template_parser = subparsers.add_parser(
×
277
            "template",
278
            parents=[env_parser],
279
            help="Create a CloudFormation template for this API Gateway.",
280
        )
281
        template_parser.add_argument(
×
282
            "-l",
283
            "--lambda-arn",
284
            required=True,
285
            help="ARN of the Lambda function to template to.",
286
        )
287
        template_parser.add_argument("-r", "--role-arn", required=True, help="ARN of the Role to template with.")
×
288
        template_parser.add_argument("-o", "--output", help="Name of file to output the template to.")
×
289

290
        ##
291
        # Invocation
292
        ##
293
        invoke_parser = subparsers.add_parser("invoke", parents=[env_parser], help="Invoke remote function.")
×
294
        invoke_parser.add_argument(
×
295
            "--raw",
296
            action="store_true",
297
            help=("When invoking remotely, invoke this python as a string," " not as a modular path."),
298
        )
299
        invoke_parser.add_argument("--no-color", action="store_true", help=("Don't color the output"))
×
300
        invoke_parser.add_argument("command_rest")
×
301

302
        ##
303
        # Manage
304
        ##
305
        manage_parser = subparsers.add_parser("manage", help="Invoke remote Django manage.py commands.")
×
306
        rest_help = "Command in the form of <env> <command>. <env> is not " "required if --all is specified"
×
307
        manage_parser.add_argument("--all", action="store_true", help=all_help)
×
308
        manage_parser.add_argument("command_rest", nargs="+", help=rest_help)
×
309
        manage_parser.add_argument("--no-color", action="store_true", help=("Don't color the output"))
×
310
        # This is explicitly added here because this is the only subcommand that doesn't inherit from env_parser
311
        # https://github.com/Miserlou/Zappa/issues/1002
312
        manage_parser.add_argument("-s", "--settings_file", help="The path to a Zappa settings file.")
×
313

314
        ##
315
        # Rollback
316
        ##
317
        def positive_int(s):
×
318
            """Ensure an arg is positive"""
319
            i = int(s)
×
320
            if i < 0:
×
321
                msg = "This argument must be positive (got {})".format(s)
×
322
                raise argparse.ArgumentTypeError(msg)
×
323
            return i
×
324

325
        rollback_parser = subparsers.add_parser(
×
326
            "rollback",
327
            parents=[env_parser],
328
            help="Rollback deployed code to a previous version.",
329
        )
330
        rollback_parser.add_argument(
×
331
            "-n",
332
            "--num-rollback",
333
            type=positive_int,
334
            default=1,
335
            help="The number of versions to rollback.",
336
        )
337

338
        ##
339
        # Scheduling
340
        ##
341
        subparsers.add_parser(
×
342
            "schedule",
343
            parents=[env_parser],
344
            help="Schedule functions to occur at regular intervals.",
345
        )
346

347
        ##
348
        # Status
349
        ##
350
        subparsers.add_parser(
×
351
            "status",
352
            parents=[env_parser],
353
            help="Show deployment status and event schedules.",
354
        )
355

356
        ##
357
        # Log Tailing
358
        ##
359
        tail_parser = subparsers.add_parser("tail", parents=[env_parser], help="Tail deployment logs.")
×
360
        tail_parser.add_argument("--no-color", action="store_true", help="Don't color log tail output.")
×
361
        tail_parser.add_argument(
×
362
            "--http",
363
            action="store_true",
364
            help="Only show HTTP requests in tail output.",
365
        )
366
        tail_parser.add_argument(
×
367
            "--non-http",
368
            action="store_true",
369
            help="Only show non-HTTP requests in tail output.",
370
        )
371
        tail_parser.add_argument(
×
372
            "--since",
373
            type=str,
374
            default="100000s",
375
            help="Only show lines since a certain timeframe.",
376
        )
377
        tail_parser.add_argument("--filter", type=str, default="", help="Apply a filter pattern to the logs.")
×
378
        tail_parser.add_argument(
×
379
            "--force-color",
380
            action="store_true",
381
            help="Force coloring log tail output even if coloring support is not auto-detected. (example: piping)",
382
        )
383
        tail_parser.add_argument(
×
384
            "--disable-keep-open",
385
            action="store_true",
386
            help="Exit after printing the last available log, rather than keeping the log open.",
387
        )
388

389
        ##
390
        # Undeploy
391
        ##
392
        undeploy_parser = subparsers.add_parser("undeploy", parents=[env_parser], help="Undeploy application.")
×
393
        undeploy_parser.add_argument(
×
394
            "--remove-logs",
395
            action="store_true",
396
            help=("Removes log groups of api gateway and lambda task" " during the undeployment."),
397
        )
398
        undeploy_parser.add_argument("-y", "--yes", action="store_true", help="Auto confirm yes.")
×
399

400
        ##
401
        # Unschedule
402
        ##
403
        subparsers.add_parser("unschedule", parents=[env_parser], help="Unschedule functions.")
×
404

405
        ##
406
        # Updating
407
        ##
408
        update_parser = subparsers.add_parser("update", parents=[env_parser], help="Update deployed application.")
×
409
        update_parser.add_argument(
×
410
            "-z",
411
            "--zip",
412
            help="Update Lambda with specific local or S3 hosted zip package",
413
        )
414
        update_parser.add_argument(
×
415
            "-n",
416
            "--no-upload",
417
            help="Update configuration where appropriate, but don't upload new code",
418
        )
419
        update_parser.add_argument(
×
420
            "-d",
421
            "--docker-image-uri",
422
            help="Update Lambda with a specific docker image hosted in AWS Elastic Container Registry",
423
        )
424

425
        ##
426
        # Debug
427
        ##
428
        subparsers.add_parser(
×
429
            "shell",
430
            parents=[env_parser],
431
            help="A debug shell with a loaded Zappa object.",
432
        )
433

434
        ##
435
        # Python Settings File
436
        ##
437
        settings_parser = subparsers.add_parser(
×
438
            "save-python-settings-file",
439
            parents=[env_parser],
440
            help="Generate & save the Zappa settings Python file for docker deployments",
441
        )
442
        settings_parser.add_argument(
×
443
            "-o",
444
            "--output_path",
445
            help=(
446
                "The path to save the Zappa settings Python file. "
447
                "File must be named zappa_settings.py and should be saved "
448
                "in the same directory as the Zappa handler.py"
449
            ),
450
        )
451

452
        argcomplete.autocomplete(parser)
×
453
        args = parser.parse_args(argv)
×
454
        self.vargs = vars(args)
×
455

456
        if args.color == "never":
×
457
            disable_click_colors()
×
458
        elif args.color == "always":
×
459
            # TODO: Support aggressive coloring like "--force-color" on all commands
460
            pass
×
461
        elif args.color == "auto":
×
UNCOV
462
            pass
×
463

464
        # Parse the input
465
        # NOTE(rmoe): Special case for manage command
466
        # The manage command can't have both stage_env and command_rest
467
        # arguments. Since they are both positional arguments argparse can't
468
        # differentiate the two. This causes problems when used with --all.
469
        # (e.g. "manage --all showmigrations admin" argparse thinks --all has
470
        # been specified AND that stage_env='showmigrations')
471
        # By having command_rest collect everything but --all we can split it
472
        # apart here instead of relying on argparse.
473
        if not args.command:
×
474
            parser.print_help()
×
475
            return
×
476

477
        if args.command == "manage" and not self.vargs.get("all"):
×
478
            self.stage_env = self.vargs["command_rest"].pop(0)
×
479
        else:
480
            self.stage_env = self.vargs.get("stage_env")
×
481

482
        if args.command in ("package", "save-python-settings-file"):
×
483
            self.load_credentials = False
×
484

485
        self.command = args.command
×
486

487
        self.disable_progress = self.vargs.get("disable_progress")
×
488
        if self.vargs.get("quiet"):
×
489
            self.silence()
×
490

491
        # We don't have any settings yet, so make those first!
492
        # (Settings-based interactions will fail
493
        # before a project has been initialized.)
494
        if self.command == "init":
×
495
            self.init()
×
496
            return
×
497

498
        # Make sure there isn't a new version available
499
        if not self.vargs.get("json"):
×
500
            self.check_for_update()
×
501

502
        # Load and Validate Settings File
503
        self.load_settings_file(self.vargs.get("settings_file"))
×
504

505
        # Should we execute this for all stages, or just one?
506
        all_stages = self.vargs.get("all")
×
507
        stages = []
×
508

509
        if all_stages:  # All stages!
×
510
            stages = self.zappa_settings.keys()
×
511
        else:  # Just one env.
512
            if not self.stage_env:
×
513
                # If there's only one stage defined in the settings,
514
                # use that as the default.
515
                if len(self.zappa_settings.keys()) == 1:
×
516
                    stages.append(list(self.zappa_settings.keys())[0])
×
517
                else:
518
                    parser.error("Please supply a stage to interact with.")
×
519
            else:
520
                stages.append(self.stage_env)
×
521

522
        for stage in stages:
×
523
            try:
×
524
                self.dispatch_command(self.command, stage)
×
525
            except ClickException as e:
×
526
                # Discussion on exit codes: https://github.com/Miserlou/Zappa/issues/407
527
                e.show()
×
528
                sys.exit(e.exit_code)
×
529

530
    def dispatch_command(self, command, stage):
5✔
531
        """
532
        Given a command to execute and stage,
533
        execute that command.
534
        """
535
        self.check_stage_name(stage)
5✔
536
        self.api_stage = stage
×
537

538
        if command not in ["status", "manage"]:
×
539
            if not self.vargs.get("json", None):
×
540
                click.echo(
×
541
                    "Calling "
542
                    + click.style(command, fg="green", bold=True)
543
                    + " for stage "
544
                    + click.style(self.api_stage, bold=True)
545
                    + ".."
546
                )
547

548
        # Explicitly define the app function.
549
        # Related: https://github.com/Miserlou/Zappa/issues/832
550
        if self.vargs.get("app_function", None):
×
551
            self.app_function = self.vargs["app_function"]
×
552

553
        # Load our settings, based on api_stage.
554
        try:
×
555
            self.load_settings(self.vargs.get("settings_file"))
×
556
        except ValueError as e:
×
557
            if hasattr(e, "message"):
×
558
                print("Error: {}".format(e.message))
×
559
            else:
560
                print(str(e))
×
561
            sys.exit(-1)
×
562
        self.callback("settings")
×
563

564
        # Hand it off
565
        if command == "deploy":  # pragma: no cover
566
            self.deploy(self.vargs["zip"], self.vargs["docker_image_uri"])
567
        if command == "package":  # pragma: no cover
568
            self.package(self.vargs["output"])
569
        if command == "template":  # pragma: no cover
570
            self.template(
571
                self.vargs["lambda_arn"],
572
                self.vargs["role_arn"],
573
                output=self.vargs["output"],
574
                json=self.vargs["json"],
575
            )
576
        elif command == "update":  # pragma: no cover
577
            self.update(
578
                self.vargs["zip"],
579
                self.vargs["no_upload"],
580
                self.vargs["docker_image_uri"],
581
            )
582
        elif command == "rollback":  # pragma: no cover
583
            self.rollback(self.vargs["num_rollback"])
584
        elif command == "invoke":  # pragma: no cover
585
            if not self.vargs.get("command_rest"):
586
                print("Please enter the function to invoke.")
587
                return
588

589
            self.invoke(
590
                self.vargs["command_rest"],
591
                raw_python=self.vargs["raw"],
592
                no_color=self.vargs["no_color"],
593
            )
594
        elif command == "manage":  # pragma: no cover
595
            if not self.vargs.get("command_rest"):
596
                print("Please enter the management command to invoke.")
597
                return
598

599
            if not self.django_settings:
600
                print("This command is for Django projects only!")
601
                print("If this is a Django project, please define django_settings in your zappa_settings.")
602
                return
603

604
            command_tail = self.vargs.get("command_rest")
605
            if len(command_tail) > 1:
606
                command = " ".join(command_tail)  # ex: zappa manage dev "shell --version"
607
            else:
608
                command = command_tail[0]  # ex: zappa manage dev showmigrations admin
609

610
            self.invoke(
611
                command,
612
                command="manage",
613
                no_color=self.vargs["no_color"],
614
            )
615

616
        elif command == "tail":  # pragma: no cover
617
            self.tail(
618
                colorize=(not self.vargs["no_color"]),
619
                http=self.vargs["http"],
620
                non_http=self.vargs["non_http"],
621
                since=self.vargs["since"],
622
                filter_pattern=self.vargs["filter"],
623
                force_colorize=self.vargs["force_color"] or None,
624
                keep_open=not self.vargs["disable_keep_open"],
625
            )
626
        elif command == "undeploy":  # pragma: no cover
627
            self.undeploy(no_confirm=self.vargs["yes"], remove_logs=self.vargs["remove_logs"])
628
        elif command == "schedule":  # pragma: no cover
629
            self.schedule()
630
        elif command == "unschedule":  # pragma: no cover
631
            self.unschedule()
632
        elif command == "status":  # pragma: no cover
633
            self.status(return_json=self.vargs["json"])
634
        elif command == "certify":  # pragma: no cover
635
            self.certify(no_confirm=self.vargs["yes"], manual=self.vargs["manual"])
636
        elif command == "shell":  # pragma: no cover
637
            self.shell()
638
        elif command == "save-python-settings-file":  # pragma: no cover
639
            self.save_python_settings_file(self.vargs["output_path"])
640

641
    ##
642
    # The Commands
643
    ##
644

645
    def save_python_settings_file(self, output_path=None):
5✔
646
        settings_path = output_path or "zappa_settings.py"
5✔
647
        print("Generating Zappa settings Python file and saving to {}".format(settings_path))
5✔
648
        if not settings_path.endswith("zappa_settings.py"):
5✔
649
            raise ValueError("Settings file must be named zappa_settings.py")
5✔
650
        zappa_settings_s = self.get_zappa_settings_string()
5✔
651
        with open(settings_path, "w") as f_out:
5✔
652
            f_out.write(zappa_settings_s)
5✔
653

654
    def package(self, output=None):
5✔
655
        """
656
        Only build the package
657
        """
658
        # Make sure we're in a venv.
659
        self.check_venv()
5✔
660

661
        # force not to delete the local zip
662
        self.override_stage_config_setting("delete_local_zip", False)
5✔
663
        # Execute the prebuild script
664
        if self.prebuild_script:
5✔
665
            self.execute_prebuild_script()
5✔
666
        # Create the Lambda Zip
667
        self.create_package(output)
5✔
668
        self.callback("zip")
5✔
669
        size = human_size(os.path.getsize(self.zip_path))
5✔
670
        click.echo(
5✔
671
            click.style("Package created", fg="green", bold=True)
672
            + ": "
673
            + click.style(self.zip_path, bold=True)
674
            + " ("
675
            + size
676
            + ")"
677
        )
678

679
    def template(self, lambda_arn, role_arn, output=None, json=False):
5✔
680
        """
681
        Only build the template file.
682
        """
683

684
        if not lambda_arn:
×
685
            raise ClickException("Lambda ARN is required to template.")
×
686

687
        if not role_arn:
×
688
            raise ClickException("Role ARN is required to template.")
×
689

690
        self.zappa.credentials_arn = role_arn
×
691

692
        # Create the template!
693
        template = self.zappa.create_stack_template(
×
694
            lambda_arn=lambda_arn,
695
            lambda_name=self.lambda_name,
696
            api_key_required=self.api_key_required,
697
            iam_authorization=self.iam_authorization,
698
            authorizer=self.authorizer,
699
            cors_options=self.cors,
700
            description=self.apigateway_description,
701
            endpoint_configuration=self.endpoint_configuration,
702
        )
703

704
        if not output:
×
705
            template_file = self.lambda_name + "-template-" + str(int(time.time())) + ".json"
×
706
        else:
707
            template_file = output
×
708
        with open(template_file, "wb") as out:
×
709
            out.write(bytes(template.to_json(indent=None, separators=(",", ":")), "utf-8"))
×
710

711
        if not json:
×
712
            click.echo(click.style("Template created", fg="green", bold=True) + ": " + click.style(template_file, bold=True))
×
713
        else:
714
            with open(template_file, "r") as out:
×
715
                print(out.read())
×
716

717
    def deploy(self, source_zip=None, docker_image_uri=None):
5✔
718
        """
719
        Package your project, upload it to S3, register the Lambda function
720
        and create the API Gateway routes.
721
        """
722

723
        if not source_zip or docker_image_uri:
5✔
724
            # Make sure the necessary IAM execution roles are available
725
            if self.manage_roles:
5✔
726
                try:
5✔
727
                    self.zappa.create_iam_roles()
5✔
728
                except botocore.client.ClientError as ce:
×
729
                    raise ClickException(
×
730
                        click.style("Failed", fg="red")
731
                        + " to "
732
                        + click.style("manage IAM roles", bold=True)
733
                        + "!\n"
734
                        + "You may "
735
                        + click.style("lack the necessary AWS permissions", bold=True)
736
                        + " to automatically manage a Zappa execution role.\n"
737
                        + click.style("Exception reported by AWS:", bold=True)
738
                        + format(ce)
739
                        + "\n"
740
                        + "To fix this, see here: "
741
                        + click.style(
742
                            "https://github.com/Zappa/Zappa#custom-aws-iam-roles-and-policies-for-deployment",
743
                            bold=True,
744
                        )
745
                        + "\n"
746
                    )
747

748
        # Make sure this isn't already deployed.
749
        deployed_versions = self.zappa.get_lambda_function_versions(self.lambda_name)
5✔
750
        if len(deployed_versions) > 0:
5✔
751
            raise ClickException(
×
752
                "This application is "
753
                + click.style("already deployed", fg="red")
754
                + " - did you mean to call "
755
                + click.style("update", bold=True)
756
                + "?"
757
            )
758

759
        if not source_zip and not docker_image_uri:
5✔
760
            # Make sure we're in a venv.
761
            self.check_venv()
5✔
762

763
            # Execute the prebuild script
764
            if self.prebuild_script:
5✔
765
                self.execute_prebuild_script()
5✔
766

767
            # Create the Lambda Zip
768
            self.create_package()
5✔
769
            self.callback("zip")
5✔
770

771
            # Upload it to S3
772
            success = self.zappa.upload_to_s3(
5✔
773
                self.zip_path,
774
                self.s3_bucket_name,
775
                disable_progress=self.disable_progress,
776
            )
777
            if not success:  # pragma: no cover
778
                raise ClickException("Unable to upload to S3. Quitting.")
779

780
            # If using a slim handler, upload it to S3 and tell lambda to use this slim handler zip
781
            if self.stage_config.get("slim_handler", False):
5✔
782
                # https://github.com/Miserlou/Zappa/issues/510
783
                success = self.zappa.upload_to_s3(
×
784
                    self.handler_path,
785
                    self.s3_bucket_name,
786
                    disable_progress=self.disable_progress,
787
                )
788
                if not success:  # pragma: no cover
789
                    raise ClickException("Unable to upload handler to S3. Quitting.")
790

791
                # Copy the project zip to the current project zip
792
                current_project_name = "{0!s}_{1!s}_current_project.tar.gz".format(self.api_stage, self.project_name)
×
793
                success = self.zappa.copy_on_s3(
×
794
                    src_file_name=self.zip_path,
795
                    dst_file_name=current_project_name,
796
                    bucket_name=self.s3_bucket_name,
797
                )
798
                if not success:  # pragma: no cover
799
                    raise ClickException("Unable to copy the zip to be the current project. Quitting.")
800

801
                handler_file = self.handler_path
×
802
            else:
803
                handler_file = self.zip_path
5✔
804

805
        # Fixes https://github.com/Miserlou/Zappa/issues/613
806
        try:
5✔
807
            self.lambda_arn = self.zappa.get_lambda_function(function_name=self.lambda_name)
5✔
808
        except botocore.client.ClientError:
×
809
            # Register the Lambda function with that zip as the source
810
            # You'll also need to define the path to your lambda_handler code.
811
            kwargs = dict(
×
812
                handler=self.lambda_handler,
813
                description=self.lambda_description,
814
                vpc_config=self.vpc_config,
815
                dead_letter_config=self.dead_letter_config,
816
                timeout=self.timeout_seconds,
817
                memory_size=self.memory_size,
818
                ephemeral_storage=self.ephemeral_storage,
819
                runtime=self.runtime,
820
                aws_environment_variables=self.aws_environment_variables,
821
                aws_kms_key_arn=self.aws_kms_key_arn,
822
                use_alb=self.use_alb,
823
                layers=self.layers,
824
                concurrency=self.lambda_concurrency,
825
            )
826
            kwargs["function_name"] = self.lambda_name
×
827
            if docker_image_uri:
×
828
                kwargs["docker_image_uri"] = docker_image_uri
×
829
            elif source_zip and source_zip.startswith("s3://"):
×
830
                bucket, key_name = parse_s3_url(source_zip)
×
831
                kwargs["bucket"] = bucket
×
832
                kwargs["s3_key"] = key_name
×
833
            elif source_zip and not source_zip.startswith("s3://"):
×
834
                with open(source_zip, mode="rb") as fh:
×
835
                    byte_stream = fh.read()
×
836
                kwargs["local_zip"] = byte_stream
×
837
            else:
838
                kwargs["bucket"] = self.s3_bucket_name
×
839
                kwargs["s3_key"] = handler_file
×
840

841
            self.lambda_arn = self.zappa.create_lambda_function(**kwargs)
×
842

843
        # Schedule events for this deployment
844
        self.schedule()
5✔
845

846
        endpoint_url = ""
5✔
847
        deployment_string = click.style("Deployment complete", fg="green", bold=True) + "!"
5✔
848

849
        if self.use_alb:
5✔
850
            kwargs = dict(
×
851
                lambda_arn=self.lambda_arn,
852
                lambda_name=self.lambda_name,
853
                alb_vpc_config=self.alb_vpc_config,
854
                timeout=self.timeout_seconds,
855
            )
856
            self.zappa.deploy_lambda_alb(**kwargs)
×
857

858
        if self.use_apigateway:
5✔
859
            # Create and configure the API Gateway
860
            self.zappa.create_stack_template(
5✔
861
                lambda_arn=self.lambda_arn,
862
                lambda_name=self.lambda_name,
863
                api_key_required=self.api_key_required,
864
                iam_authorization=self.iam_authorization,
865
                authorizer=self.authorizer,
866
                cors_options=self.cors,
867
                description=self.apigateway_description,
868
                endpoint_configuration=self.endpoint_configuration,
869
            )
870

871
            self.zappa.update_stack(
5✔
872
                self.lambda_name,
873
                self.s3_bucket_name,
874
                wait=True,
875
                disable_progress=self.disable_progress,
876
            )
877

878
            api_id = self.zappa.get_api_id(self.lambda_name)
5✔
879

880
            # Add binary support
881
            if self.binary_support:
5✔
882
                self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
5✔
883

884
            # Add payload compression
885
            if self.stage_config.get("payload_compression", True):
5✔
886
                self.zappa.add_api_compression(
5✔
887
                    api_id=api_id,
888
                    min_compression_size=self.stage_config.get("payload_minimum_compression_size", 0),
889
                )
890

891
            # Deploy the API!
892
            endpoint_url = self.deploy_api_gateway(api_id)
5✔
893
            deployment_string = deployment_string + ": {}".format(endpoint_url)
5✔
894

895
            # Create/link API key
896
            if self.api_key_required:
5✔
897
                if self.api_key is None:
×
898
                    self.zappa.create_api_key(api_id=api_id, stage_name=self.api_stage)
×
899
                else:
900
                    self.zappa.add_api_stage_to_api_key(api_key=self.api_key, api_id=api_id, stage_name=self.api_stage)
×
901

902
            if self.stage_config.get("touch", True):
5✔
903
                self.zappa.wait_until_lambda_function_is_updated(function_name=self.lambda_name)
×
904
                self.touch_endpoint(endpoint_url)
×
905

906
        # Finally, delete the local copy our zip package
907
        if not source_zip and not docker_image_uri:
5✔
908
            if self.stage_config.get("delete_local_zip", True):
5✔
909
                self.remove_local_zip()
5✔
910

911
        # Remove the project zip from S3.
912
        if not source_zip and not docker_image_uri:
5✔
913
            self.remove_uploaded_zip()
5✔
914

915
        self.callback("post")
5✔
916

917
        click.echo(deployment_string)
5✔
918

919
    def update(self, source_zip=None, no_upload=False, docker_image_uri=None):
5✔
920
        """
921
        Repackage and update the function code.
922
        """
923

924
        if not source_zip and not docker_image_uri:
5✔
925
            # Make sure we're in a venv.
926
            self.check_venv()
5✔
927

928
            # Execute the prebuild script
929
            if self.prebuild_script:
5✔
930
                self.execute_prebuild_script()
5✔
931

932
            # Temporary version check
933
            try:
5✔
934
                updated_time = 1472581018
5✔
935
                function_response = self.zappa.lambda_client.get_function(FunctionName=self.lambda_name)
5✔
936
                conf = function_response["Configuration"]
5✔
937
                last_updated = parser.parse(conf["LastModified"])
5✔
938
                last_updated_unix = time.mktime(last_updated.timetuple())
5✔
939
            except botocore.exceptions.BotoCoreError as e:
×
940
                click.echo(click.style(type(e).__name__, fg="red") + ": " + e.args[0])
×
941
                sys.exit(-1)
×
942
            # https://github.com/zappa/Zappa/issues/1313
943
            except botocore.exceptions.ClientError as e:
×
944
                click.echo(click.style(type(e).__name__, fg="red") + ": " + e.args[0])
×
945
                sys.exit(-1)
×
946
            except Exception:
×
947
                click.echo(
×
948
                    click.style("Warning!", fg="red")
949
                    + " Couldn't get function "
950
                    + self.lambda_name
951
                    + " in "
952
                    + self.zappa.aws_region
953
                    + " - have you deployed yet?"
954
                )
955
                sys.exit(-1)
×
956

957
            if last_updated_unix <= updated_time:
5✔
958
                click.echo(
5✔
959
                    click.style("Warning!", fg="red")
960
                    + " You may have upgraded Zappa since deploying this application. You will need to "
961
                    + click.style("redeploy", bold=True)
962
                    + " for this deployment to work properly!"
963
                )
964

965
            # Make sure the necessary IAM execution roles are available
966
            if self.manage_roles:
5✔
967
                try:
5✔
968
                    self.zappa.create_iam_roles()
5✔
969
                except botocore.client.ClientError:
×
970
                    click.echo(click.style("Failed", fg="red") + " to " + click.style("manage IAM roles", bold=True) + "!")
×
971
                    click.echo(
×
972
                        "You may "
973
                        + click.style("lack the necessary AWS permissions", bold=True)
974
                        + " to automatically manage a Zappa execution role."
975
                    )
976
                    click.echo(
×
977
                        "To fix this, see here: "
978
                        + click.style(
979
                            "https://github.com/Zappa/Zappa#custom-aws-iam-roles-and-policies-for-deployment",
980
                            bold=True,
981
                        )
982
                    )
983
                    sys.exit(-1)
×
984

985
            # Create the Lambda Zip,
986
            if not no_upload:
5✔
987
                self.create_package()
5✔
988
                self.callback("zip")
5✔
989

990
            # Upload it to S3
991
            if not no_upload:
5✔
992
                success = self.zappa.upload_to_s3(
5✔
993
                    self.zip_path,
994
                    self.s3_bucket_name,
995
                    disable_progress=self.disable_progress,
996
                )
997
                if not success:  # pragma: no cover
998
                    raise ClickException("Unable to upload project to S3. Quitting.")
999

1000
                # If using a slim handler, upload it to S3 and tell lambda to use this slim handler zip
1001
                if self.stage_config.get("slim_handler", False):
5✔
1002
                    # https://github.com/Miserlou/Zappa/issues/510
1003
                    success = self.zappa.upload_to_s3(
×
1004
                        self.handler_path,
1005
                        self.s3_bucket_name,
1006
                        disable_progress=self.disable_progress,
1007
                    )
1008
                    if not success:  # pragma: no cover
1009
                        raise ClickException("Unable to upload handler to S3. Quitting.")
1010

1011
                    # Copy the project zip to the current project zip
1012
                    current_project_name = "{0!s}_{1!s}_current_project.tar.gz".format(self.api_stage, self.project_name)
×
1013
                    success = self.zappa.copy_on_s3(
×
1014
                        src_file_name=self.zip_path,
1015
                        dst_file_name=current_project_name,
1016
                        bucket_name=self.s3_bucket_name,
1017
                    )
1018
                    if not success:  # pragma: no cover
1019
                        raise ClickException("Unable to copy the zip to be the current project. Quitting.")
1020

1021
                    handler_file = self.handler_path
×
1022
                else:
1023
                    handler_file = self.zip_path
5✔
1024

1025
        # Register the Lambda function with that zip as the source
1026
        # You'll also need to define the path to your lambda_handler code.
1027
        kwargs = dict(
5✔
1028
            bucket=self.s3_bucket_name,
1029
            function_name=self.lambda_name,
1030
            num_revisions=self.num_retained_versions,
1031
            concurrency=self.lambda_concurrency,
1032
        )
1033
        if docker_image_uri:
5✔
1034
            kwargs["docker_image_uri"] = docker_image_uri
×
1035
            self.lambda_arn = self.zappa.update_lambda_function(**kwargs)
×
1036
        elif source_zip and source_zip.startswith("s3://"):
5✔
1037
            bucket, key_name = parse_s3_url(source_zip)
×
1038
            kwargs.update(dict(bucket=bucket, s3_key=key_name))
×
1039
            self.lambda_arn = self.zappa.update_lambda_function(**kwargs)
×
1040
        elif source_zip and not source_zip.startswith("s3://"):
5✔
1041
            with open(source_zip, mode="rb") as fh:
×
1042
                byte_stream = fh.read()
×
1043
            kwargs["local_zip"] = byte_stream
×
1044
            self.lambda_arn = self.zappa.update_lambda_function(**kwargs)
×
1045
        else:
1046
            if not no_upload:
5✔
1047
                kwargs["s3_key"] = handler_file
5✔
1048
                self.lambda_arn = self.zappa.update_lambda_function(**kwargs)
5✔
1049

1050
        # Remove the uploaded zip from S3, because it is now registered..
1051
        if not source_zip and not no_upload and not docker_image_uri:
5✔
1052
            self.remove_uploaded_zip()
5✔
1053

1054
        # Update the configuration, in case there are changes.
1055
        self.lambda_arn = self.zappa.update_lambda_configuration(
5✔
1056
            lambda_arn=self.lambda_arn,
1057
            function_name=self.lambda_name,
1058
            handler=self.lambda_handler,
1059
            description=self.lambda_description,
1060
            vpc_config=self.vpc_config,
1061
            timeout=self.timeout_seconds,
1062
            memory_size=self.memory_size,
1063
            ephemeral_storage=self.ephemeral_storage,
1064
            runtime=self.runtime,
1065
            aws_environment_variables=self.aws_environment_variables,
1066
            aws_kms_key_arn=self.aws_kms_key_arn,
1067
            layers=self.layers,
1068
            snap_start=self.snap_start,
1069
            wait=False,
1070
        )
1071

1072
        # Finally, delete the local copy our zip package
1073
        if not source_zip and not no_upload and not docker_image_uri:
5✔
1074
            if self.stage_config.get("delete_local_zip", True):
5✔
1075
                self.remove_local_zip()
5✔
1076

1077
        if self.use_apigateway:
5✔
1078
            self.zappa.create_stack_template(
5✔
1079
                lambda_arn=self.lambda_arn,
1080
                lambda_name=self.lambda_name,
1081
                api_key_required=self.api_key_required,
1082
                iam_authorization=self.iam_authorization,
1083
                authorizer=self.authorizer,
1084
                cors_options=self.cors,
1085
                description=self.apigateway_description,
1086
                endpoint_configuration=self.endpoint_configuration,
1087
            )
1088
            self.zappa.update_stack(
5✔
1089
                self.lambda_name,
1090
                self.s3_bucket_name,
1091
                wait=True,
1092
                update_only=True,
1093
                disable_progress=self.disable_progress,
1094
            )
1095

1096
            api_id = self.zappa.get_api_id(self.lambda_name)
5✔
1097

1098
            # Update binary support
1099
            if self.binary_support:
5✔
1100
                self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
5✔
1101
            else:
1102
                self.zappa.remove_binary_support(api_id=api_id, cors=self.cors)
×
1103

1104
            if self.stage_config.get("payload_compression", True):
5✔
1105
                self.zappa.add_api_compression(
5✔
1106
                    api_id=api_id,
1107
                    min_compression_size=self.stage_config.get("payload_minimum_compression_size", 0),
1108
                )
1109
            else:
1110
                self.zappa.remove_api_compression(api_id=api_id)
×
1111

1112
            # It looks a bit like we might actually be using this just to get the URL,
1113
            # but we're also updating a few of the APIGW settings.
1114
            endpoint_url = self.deploy_api_gateway(api_id)
5✔
1115

1116
            if self.stage_config.get("domain", None):
5✔
1117
                endpoint_url = self.stage_config.get("domain")
×
1118

1119
        else:
1120
            endpoint_url = None
×
1121

1122
        self.schedule()
5✔
1123

1124
        # Update any cognito pool with the lambda arn
1125
        # do this after schedule as schedule clears the lambda policy and we need to add one
1126
        self.update_cognito_triggers()
5✔
1127

1128
        self.callback("post")
5✔
1129

1130
        if endpoint_url and "https://" not in endpoint_url:
5✔
1131
            endpoint_url = "https://" + endpoint_url
×
1132

1133
        if self.base_path:
5✔
1134
            endpoint_url += "/" + self.base_path
×
1135

1136
        deployed_string = "Your updated Zappa deployment is " + click.style("live", fg="green", bold=True) + "!"
5✔
1137
        if self.use_apigateway:
5✔
1138
            deployed_string = deployed_string + ": " + click.style("{}".format(endpoint_url), bold=True)
5✔
1139

1140
            api_url = None
5✔
1141
            if endpoint_url and "amazonaws.com" not in endpoint_url:
5✔
1142
                api_url = self.zappa.get_api_url(self.lambda_name, self.api_stage)
×
1143

1144
                if endpoint_url != api_url:
×
1145
                    deployed_string = deployed_string + " (" + api_url + ")"
×
1146

1147
            if self.stage_config.get("touch", True):
5✔
1148
                self.zappa.wait_until_lambda_function_is_updated(function_name=self.lambda_name)
×
1149
                if api_url:
×
1150
                    self.touch_endpoint(api_url)
×
1151
                elif endpoint_url:
×
1152
                    self.touch_endpoint(endpoint_url)
×
1153

1154
        click.echo(deployed_string)
5✔
1155

1156
    def rollback(self, revision):
5✔
1157
        """
1158
        Rollsback the currently deploy lambda code to a previous revision.
1159
        """
1160

1161
        print("Rolling back..")
5✔
1162

1163
        self.zappa.rollback_lambda_function_version(self.lambda_name, versions_back=revision)
5✔
1164
        print("Done!")
5✔
1165

1166
    def tail(
5✔
1167
        self,
1168
        since,
1169
        filter_pattern,
1170
        limit=10000,
1171
        keep_open=True,
1172
        colorize=True,
1173
        http=False,
1174
        non_http=False,
1175
        force_colorize=False,
1176
    ):
1177
        """
1178
        Tail this function's logs.
1179
        if keep_open, do so repeatedly, printing any new logs
1180
        """
1181

1182
        try:
5✔
1183
            since_stamp = string_to_timestamp(since)
5✔
1184

1185
            last_since = since_stamp
5✔
1186
            while True:
5✔
1187
                new_logs = self.zappa.fetch_logs(
5✔
1188
                    self.lambda_name,
1189
                    start_time=since_stamp,
1190
                    limit=limit,
1191
                    filter_pattern=filter_pattern,
1192
                )
1193

1194
                new_logs = [e for e in new_logs if e["timestamp"] > last_since]
5✔
1195
                self.print_logs(new_logs, colorize, http, non_http, force_colorize)
5✔
1196

1197
                if not keep_open:
5✔
1198
                    break
5✔
1199
                if new_logs:
×
1200
                    last_since = new_logs[-1]["timestamp"]
×
1201
                time.sleep(1)
×
1202
        except KeyboardInterrupt:  # pragma: no cover
1203
            # Die gracefully
1204
            try:
1205
                sys.exit(0)
1206
            except SystemExit:
1207
                os._exit(130)
1208

1209
    def undeploy(self, no_confirm=False, remove_logs=False):
5✔
1210
        """
1211
        Tear down an existing deployment.
1212
        """
1213

1214
        if not no_confirm:  # pragma: no cover
1215
            confirm = input("Are you sure you want to undeploy? [y/n] ")
1216
            if confirm != "y":
1217
                return
1218

1219
        if self.use_alb:
5✔
1220
            self.zappa.undeploy_lambda_alb(self.lambda_name)
×
1221

1222
        if self.use_apigateway:
5✔
1223
            if remove_logs:
5✔
1224
                self.zappa.remove_api_gateway_logs(self.lambda_name)
5✔
1225

1226
            domain_name = self.stage_config.get("domain", None)
5✔
1227
            base_path = self.stage_config.get("base_path", None)
5✔
1228

1229
            # Only remove the api key when not specified
1230
            if self.api_key_required and self.api_key is None:
5✔
1231
                api_id = self.zappa.get_api_id(self.lambda_name)
×
1232
                self.zappa.remove_api_key(api_id, self.api_stage)
×
1233

1234
            self.zappa.undeploy_api_gateway(self.lambda_name, domain_name=domain_name, base_path=base_path)
5✔
1235

1236
        self.unschedule()  # removes event triggers, including warm up event.
5✔
1237

1238
        self.zappa.delete_lambda_function(self.lambda_name)
5✔
1239
        if remove_logs:
5✔
1240
            self.zappa.remove_lambda_function_logs(self.lambda_name)
5✔
1241

1242
        click.echo(click.style("Done", fg="green", bold=True) + "!")
5✔
1243

1244
    def update_cognito_triggers(self):
5✔
1245
        """
1246
        Update any cognito triggers
1247
        """
1248
        if self.cognito:
5✔
1249
            user_pool = self.cognito.get("user_pool")
5✔
1250
            triggers = self.cognito.get("triggers", [])
5✔
1251
            lambda_configs = set()
5✔
1252
            for trigger in triggers:
5✔
1253
                lambda_configs.add(trigger["source"].split("_")[0])
5✔
1254
            self.zappa.update_cognito(self.lambda_name, user_pool, lambda_configs, self.lambda_arn)
5✔
1255

1256
    def schedule(self):
5✔
1257
        """
1258
        Given a a list of functions and a schedule to execute them,
1259
        setup up regular execution.
1260
        """
1261
        events = self.stage_config.get("events", [])
5✔
1262

1263
        if events:
5✔
1264
            if not isinstance(events, list):  # pragma: no cover
1265
                print("Events must be supplied as a list.")
1266
                return
1267

1268
        for event in events:
5✔
1269
            self.collision_warning(event.get("function"))
5✔
1270

1271
        if self.stage_config.get("keep_warm", True):
5✔
1272
            if not events:
5✔
1273
                events = []
×
1274

1275
            keep_warm_rate = self.stage_config.get("keep_warm_expression", "rate(4 minutes)")
5✔
1276
            events.append(
5✔
1277
                {
1278
                    "name": "zappa-keep-warm",
1279
                    "function": "handler.keep_warm_callback",
1280
                    "expression": keep_warm_rate,
1281
                    "description": "Zappa Keep Warm - {}".format(self.lambda_name),
1282
                }
1283
            )
1284

1285
        if events:
5✔
1286
            try:
5✔
1287
                function_response = self.zappa.lambda_client.get_function(FunctionName=self.lambda_name)
5✔
1288
            except botocore.exceptions.ClientError:  # pragma: no cover
1289
                click.echo(
1290
                    click.style("Function does not exist", fg="yellow")
1291
                    + ", please "
1292
                    + click.style("deploy", bold=True)
1293
                    + "first. Ex:"
1294
                    + click.style("zappa deploy {}.".format(self.api_stage), bold=True)
1295
                )
1296
                sys.exit(-1)
1297

1298
            print("Scheduling..")
5✔
1299
            self.zappa.schedule_events(
5✔
1300
                lambda_arn=function_response["Configuration"]["FunctionArn"],
1301
                lambda_name=self.lambda_name,
1302
                events=events,
1303
            )
1304

1305
        # Add async tasks SNS
1306
        if self.stage_config.get("async_source", None) == "sns" and self.stage_config.get("async_resources", True):
5✔
1307
            self.lambda_arn = self.zappa.get_lambda_function(function_name=self.lambda_name)
×
1308
            topic_arn = self.zappa.create_async_sns_topic(lambda_name=self.lambda_name, lambda_arn=self.lambda_arn)
×
1309
            click.echo("SNS Topic created: %s" % topic_arn)
×
1310

1311
        # Add async tasks DynamoDB
1312
        table_name = self.stage_config.get("async_response_table", False)
5✔
1313
        read_capacity = self.stage_config.get("async_response_table_read_capacity", 1)
5✔
1314
        write_capacity = self.stage_config.get("async_response_table_write_capacity", 1)
5✔
1315
        if table_name and self.stage_config.get("async_resources", True):
5✔
1316
            created, response_table = self.zappa.create_async_dynamodb_table(table_name, read_capacity, write_capacity)
×
1317
            if created:
×
1318
                click.echo("DynamoDB table created: %s" % table_name)
×
1319
            else:
1320
                click.echo("DynamoDB table exists: %s" % table_name)
×
1321
                provisioned_throughput = response_table["Table"]["ProvisionedThroughput"]
×
1322
                if (
×
1323
                    provisioned_throughput["ReadCapacityUnits"] != read_capacity
1324
                    or provisioned_throughput["WriteCapacityUnits"] != write_capacity
1325
                ):
1326
                    click.echo(
×
1327
                        click.style(
1328
                            "\nWarning! Existing DynamoDB table ({}) does not match configured capacity.\n".format(table_name),
1329
                            fg="red",
1330
                        )
1331
                    )
1332

1333
    def unschedule(self):
5✔
1334
        """
1335
        Given a a list of scheduled functions,
1336
        tear down their regular execution.
1337
        """
1338

1339
        # Run even if events are not defined to remove previously existing ones (thus default to []).
1340
        events = self.stage_config.get("events", [])
5✔
1341

1342
        if not isinstance(events, list):  # pragma: no cover
1343
            print("Events must be supplied as a list.")
1344
            return
1345

1346
        function_arn = None
5✔
1347
        try:
5✔
1348
            function_response = self.zappa.lambda_client.get_function(FunctionName=self.lambda_name)
5✔
1349
            function_arn = function_response["Configuration"]["FunctionArn"]
5✔
1350
        except botocore.exceptions.ClientError:  # pragma: no cover
1351
            raise ClickException(
1352
                "Function does not exist, you should deploy first. Ex: zappa deploy {}. "
1353
                "Proceeding to unschedule CloudWatch based events.".format(self.api_stage)
1354
            )
1355

1356
        print("Unscheduling..")
5✔
1357
        self.zappa.unschedule_events(
5✔
1358
            lambda_name=self.lambda_name,
1359
            lambda_arn=function_arn,
1360
            events=events,
1361
        )
1362

1363
        # Remove async task SNS
1364
        if self.stage_config.get("async_source", None) == "sns" and self.stage_config.get("async_resources", True):
5✔
1365
            removed_arns = self.zappa.remove_async_sns_topic(self.lambda_name)
×
1366
            click.echo("SNS Topic removed: %s" % ", ".join(removed_arns))
×
1367

1368
    def invoke(self, function_name, raw_python=False, command=None, no_color=False):
5✔
1369
        """
1370
        Invoke a remote function.
1371
        """
1372

1373
        # There are three likely scenarios for 'command' here:
1374
        #   command, which is a modular function path
1375
        #   raw_command, which is a string of python to execute directly
1376
        #   manage, which is a Django-specific management command invocation
1377
        key = command if command is not None else "command"
×
1378
        if raw_python:
×
1379
            command = {"raw_command": function_name}
×
1380
        else:
1381
            command = {key: function_name}
×
1382

1383
        # Can't use hjson
1384
        import json as json
×
1385

1386
        response = self.zappa.invoke_lambda_function(
×
1387
            self.lambda_name,
1388
            json.dumps(command),
1389
            invocation_type="RequestResponse",
1390
        )
1391

1392
        print(self.format_lambda_response(response, not no_color))
×
1393

1394
        # For a successful request FunctionError is not in response.
1395
        # https://github.com/Miserlou/Zappa/pull/1254/
1396
        if "FunctionError" in response:
×
1397
            raise ClickException("{} error occurred while invoking command.".format(response["FunctionError"]))
×
1398

1399
    def format_lambda_response(self, response, colorize=True):
5✔
1400
        if "LogResult" in response:
5✔
1401
            logresult_bytes = base64.b64decode(response["LogResult"])
5✔
1402
            try:
5✔
1403
                decoded = logresult_bytes.decode()
5✔
1404
            except UnicodeDecodeError:
5✔
1405
                return logresult_bytes
5✔
1406
            else:
1407
                if colorize and sys.stdout.isatty():
5✔
1408
                    formatted = self.format_invoke_command(decoded)
5✔
1409
                    return self.colorize_invoke_command(formatted)
5✔
1410
                else:
1411
                    return decoded
5✔
1412
        else:
1413
            return response
5✔
1414

1415
    def format_invoke_command(self, string):
5✔
1416
        """
1417
        Formats correctly the string output from the invoke() method,
1418
        replacing line breaks and tabs when necessary.
1419
        """
1420

1421
        string = string.replace("\\n", "\n")
5✔
1422

1423
        formated_response = ""
5✔
1424
        for line in string.splitlines():
5✔
1425
            if line.startswith("REPORT"):
5✔
1426
                line = line.replace("\t", "\n")
5✔
1427
            if line.startswith("[DEBUG]"):
5✔
1428
                line = line.replace("\t", " ")
5✔
1429
            formated_response += line + "\n"
5✔
1430
        formated_response = formated_response.replace("\n\n", "\n")
5✔
1431

1432
        return formated_response
5✔
1433

1434
    def colorize_invoke_command(self, string):
5✔
1435
        """
1436
        Apply various heuristics to return a colorized version the invoke
1437
        command string. If these fail, simply return the string in plaintext.
1438
        Inspired by colorize_log_entry().
1439
        """
1440

1441
        final_string = string
5✔
1442

1443
        try:
5✔
1444
            # Line headers
1445
            try:
5✔
1446
                for token in ["START", "END", "REPORT", "[DEBUG]"]:
5✔
1447
                    if token in final_string:
5✔
1448
                        format_string = "[{}]"
5✔
1449
                        # match whole words only
1450
                        pattern = r"\b{}\b"
5✔
1451
                        if token == "[DEBUG]":
5✔
1452
                            format_string = "{}"
5✔
1453
                            pattern = re.escape(token)
5✔
1454
                        repl = click.style(format_string.format(token), bold=True, fg="cyan")
5✔
1455
                        final_string = re.sub(pattern.format(token), repl, final_string)
5✔
1456
            except Exception:  # pragma: no cover
1457
                pass
1458

1459
            # Green bold Tokens
1460
            try:
5✔
1461
                for token in [
5✔
1462
                    "Zappa Event:",
1463
                    "RequestId:",
1464
                    "Version:",
1465
                    "Duration:",
1466
                    "Billed",
1467
                    "Memory Size:",
1468
                    "Max Memory Used:",
1469
                ]:
1470
                    if token in final_string:
5✔
1471
                        final_string = final_string.replace(token, click.style(token, bold=True, fg="green"))
5✔
1472
            except Exception:  # pragma: no cover
1473
                pass
1474

1475
            # UUIDs
1476
            for token in final_string.replace("\t", " ").split(" "):
5✔
1477
                try:
5✔
1478
                    if token.count("-") == 4 and token.replace("-", "").isalnum():
5✔
1479
                        final_string = final_string.replace(token, click.style(token, fg="magenta"))
5✔
1480
                except Exception:  # pragma: no cover
1481
                    pass
1482

1483
            return final_string
5✔
1484
        except Exception:
×
1485
            return string
×
1486

1487
    def status(self, return_json=False):
5✔
1488
        """
1489
        Describe the status of the current deployment.
1490
        """
1491

1492
        def tabular_print(title, value):
5✔
1493
            """
1494
            Convenience function for priting formatted table items.
1495
            """
1496
            click.echo("%-*s%s" % (32, click.style("\t" + title, fg="green") + ":", str(value)))
5✔
1497
            return
5✔
1498

1499
        # Lambda Env Details
1500
        lambda_versions = self.zappa.get_lambda_function_versions(self.lambda_name)
5✔
1501

1502
        if not lambda_versions:
5✔
1503
            raise ClickException(
×
1504
                click.style(
1505
                    "No Lambda %s detected in %s - have you deployed yet?" % (self.lambda_name, self.zappa.aws_region),
1506
                    fg="red",
1507
                )
1508
            )
1509

1510
        status_dict = collections.OrderedDict()
5✔
1511
        status_dict["Lambda Versions"] = len(lambda_versions)
5✔
1512
        function_response = self.zappa.lambda_client.get_function(FunctionName=self.lambda_name)
5✔
1513
        conf = function_response["Configuration"]
5✔
1514
        self.lambda_arn = conf["FunctionArn"]
5✔
1515
        status_dict["Lambda Name"] = self.lambda_name
5✔
1516
        status_dict["Lambda ARN"] = self.lambda_arn
5✔
1517
        status_dict["Lambda Role ARN"] = conf["Role"]
5✔
1518
        status_dict["Lambda Code Size"] = conf["CodeSize"]
5✔
1519
        status_dict["Lambda Version"] = conf["Version"]
5✔
1520
        status_dict["Lambda Last Modified"] = conf["LastModified"]
5✔
1521
        status_dict["Lambda Memory Size"] = conf["MemorySize"]
5✔
1522
        status_dict["Lambda Timeout"] = conf["Timeout"]
5✔
1523
        # Handler & Runtime won't be present for lambda Docker deployments
1524
        # https://github.com/Miserlou/Zappa/issues/2188
1525
        status_dict["Lambda Handler"] = conf.get("Handler", "")
5✔
1526
        status_dict["Lambda Runtime"] = conf.get("Runtime", "")
5✔
1527
        if "VpcConfig" in conf.keys():
5✔
1528
            status_dict["Lambda VPC ID"] = conf.get("VpcConfig", {}).get("VpcId", "Not assigned")
×
1529
        else:
1530
            status_dict["Lambda VPC ID"] = None
5✔
1531

1532
        # Calculated statistics
1533
        try:
5✔
1534
            function_invocations = self.zappa.cloudwatch.get_metric_statistics(
5✔
1535
                Namespace="AWS/Lambda",
1536
                MetricName="Invocations",
1537
                StartTime=datetime.now(timezone.utc) - timedelta(days=1),
1538
                EndTime=datetime.now(timezone.utc),
1539
                Period=1440,
1540
                Statistics=["Sum"],
1541
                Dimensions=[{"Name": "FunctionName", "Value": "{}".format(self.lambda_name)}],
1542
            )["Datapoints"][0]["Sum"]
1543
        except Exception:
×
1544
            function_invocations = 0
×
1545
        try:
5✔
1546
            function_errors = self.zappa.cloudwatch.get_metric_statistics(
5✔
1547
                Namespace="AWS/Lambda",
1548
                MetricName="Errors",
1549
                StartTime=datetime.now(timezone.utc) - timedelta(days=1),
1550
                EndTime=datetime.now(timezone.utc),
1551
                Period=1440,
1552
                Statistics=["Sum"],
1553
                Dimensions=[{"Name": "FunctionName", "Value": "{}".format(self.lambda_name)}],
1554
            )["Datapoints"][0]["Sum"]
1555
        except Exception:
×
1556
            function_errors = 0
×
1557

1558
        try:
5✔
1559
            error_rate = "{0:.2f}%".format(function_errors / function_invocations * 100)
5✔
1560
        except Exception:
×
1561
            error_rate = "Error calculating"
×
1562
        status_dict["Invocations (24h)"] = int(function_invocations)
5✔
1563
        status_dict["Errors (24h)"] = int(function_errors)
5✔
1564
        status_dict["Error Rate (24h)"] = error_rate
5✔
1565

1566
        # URLs
1567
        if self.use_apigateway:
5✔
1568
            api_url = self.zappa.get_api_url(self.lambda_name, self.api_stage)
5✔
1569

1570
            status_dict["API Gateway URL"] = api_url
5✔
1571

1572
            # Api Keys
1573
            api_id = self.zappa.get_api_id(self.lambda_name)
5✔
1574
            for api_key in self.zappa.get_api_keys(api_id, self.api_stage):
5✔
1575
                status_dict["API Gateway x-api-key"] = api_key
5✔
1576

1577
            # There literally isn't a better way to do this.
1578
            # AWS provides no way to tie a APIGW domain name to its Lambda function.
1579
            domain_url = self.stage_config.get("domain", None)
5✔
1580
            base_path = self.stage_config.get("base_path", None)
5✔
1581
            if domain_url:
5✔
1582
                status_dict["Domain URL"] = "https://" + domain_url
×
1583
                if base_path:
×
1584
                    status_dict["Domain URL"] += "/" + base_path
×
1585
            else:
1586
                status_dict["Domain URL"] = "None Supplied"
5✔
1587

1588
        # Scheduled Events
1589
        event_rules = self.zappa.get_event_rules_for_lambda(lambda_arn=self.lambda_arn)
5✔
1590
        status_dict["Num. Event Rules"] = len(event_rules)
5✔
1591
        if len(event_rules) > 0:
5✔
1592
            status_dict["Events"] = []
5✔
1593
        for rule in event_rules:
5✔
1594
            event_dict = {}
5✔
1595
            rule_name = rule["Name"]
5✔
1596
            event_dict["Event Rule Name"] = rule_name
5✔
1597
            event_dict["Event Rule Schedule"] = rule.get("ScheduleExpression", None)
5✔
1598
            event_dict["Event Rule State"] = rule.get("State", None).title()
5✔
1599
            event_dict["Event Rule ARN"] = rule.get("Arn", None)
5✔
1600
            status_dict["Events"].append(event_dict)
5✔
1601

1602
        if return_json:
5✔
1603
            # Putting the status in machine readable format
1604
            # https://github.com/Miserlou/Zappa/issues/407
1605
            print(json.dumpsJSON(status_dict))
×
1606
        else:
1607
            click.echo("Status for " + click.style(self.lambda_name, bold=True) + ": ")
5✔
1608
            for k, v in status_dict.items():
5✔
1609
                if k == "Events":
5✔
1610
                    # Events are a list of dicts
1611
                    for event in v:
5✔
1612
                        for item_k, item_v in event.items():
5✔
1613
                            tabular_print(item_k, item_v)
5✔
1614
                else:
1615
                    tabular_print(k, v)
5✔
1616

1617
        # TODO: S3/SQS/etc. type events?
1618

1619
        return True
5✔
1620

1621
    def check_stage_name(self, stage_name):
5✔
1622
        """
1623
        Make sure the stage name matches the AWS-allowed pattern
1624
        (calls to apigateway_client.create_deployment, will fail with error
1625
        message "ClientError: An error occurred (BadRequestException) when
1626
        calling the CreateDeployment operation: Stage name only allows
1627
        a-zA-Z0-9_" if the pattern does not match)
1628
        """
1629
        if not self.use_apigateway:
5✔
1630
            return True
×
1631
        if self.stage_name_env_pattern.match(stage_name):
5✔
1632
            return True
×
1633
        raise ValueError("API stage names must match a-zA-Z0-9_ ; '{0!s}' does not.".format(stage_name))
5✔
1634

1635
    def check_environment(self, environment):
5✔
1636
        """
1637
        Make sure the environment contains only strings
1638
        (since putenv needs a string)
1639
        """
1640

1641
        non_strings = []
5✔
1642
        for k, v in environment.items():
5✔
1643
            if not isinstance(v, str):
5✔
1644
                non_strings.append(k)
5✔
1645
        if non_strings:
5✔
1646
            raise ValueError("The following environment variables are not strings: {}".format(", ".join(non_strings)))
5✔
1647
        else:
1648
            return True
5✔
1649

1650
    def _get_init_env(self) -> str:
5✔
1651
        # Create Env
NEW
1652
        while True:
×
NEW
1653
            click.echo(
×
1654
                "Your Zappa configuration can support multiple production stages, like '"
1655
                + click.style("dev", bold=True)
1656
                + "', '"
1657
                + click.style("staging", bold=True)
1658
                + "', and '"
1659
                + click.style("production", bold=True)
1660
                + "'."
1661
            )
NEW
1662
            env = input("What do you want to call this environment (default 'dev'): ") or "dev"
×
NEW
1663
            try:
×
NEW
1664
                self.check_stage_name(env)
×
NEW
1665
                break
×
NEW
1666
            except ValueError:
×
NEW
1667
                click.echo(click.style("Stage names must match a-zA-Z0-9_", fg="red"))
×
NEW
1668
        return env
×
1669

1670
    def _get_init_profile(self, default_profile: str, profiles: dict, profile_names: list) -> tuple[str, dict]:
5✔
NEW
1671
        while True:
×
NEW
1672
            profile_name = (
×
1673
                input(
1674
                    "We found the following profiles: {}, and {}. "
1675
                    "Which would you like us to use? (default '{}'): ".format(
1676
                        ", ".join(profile_names[:-1]),
1677
                        profile_names[-1],
1678
                        default_profile,
1679
                    )
1680
                )
1681
                or default_profile
1682
            )
NEW
1683
            if profile_name in profiles:
×
NEW
1684
                profile = profiles[profile_name]
×
NEW
1685
                break
×
1686
            else:
NEW
1687
                click.echo("Please enter a valid name for your AWS profile.")
×
NEW
1688
        return profile_name, profile
×
1689

1690
    def _get_init_bucket(self, default_bucket: str) -> str:
5✔
NEW
1691
        while True:
×
NEW
1692
            bucket = input("What do you want to call your bucket? (default '%s'): " % default_bucket) or default_bucket
×
1693

NEW
1694
            if is_valid_bucket_name(bucket):
×
NEW
1695
                break
×
1696

NEW
1697
            click.echo(click.style("Invalid bucket name!", bold=True))
×
NEW
1698
            click.echo("S3 buckets must be named according to the following rules:")
×
NEW
1699
            click.echo(
×
1700
                """* Bucket names must be unique across all existing bucket names in Amazon S3.
1701
* Bucket names must comply with DNS naming conventions.
1702
* Bucket names must be at least 3 and no more than 63 characters long.
1703
* Bucket names must not contain uppercase characters or underscores.
1704
* Bucket names must start with a lowercase letter or number.
1705
* Bucket names must be a series of one or more labels. Adjacent labels are separated
1706
  by a single period (.). Bucket names can contain lowercase letters, numbers, and
1707
  hyphens. Each label must start and end with a lowercase letter or a number.
1708
* Bucket names must not be formatted as an IP address (for example, 192.168.5.4).
1709
* When you use virtual hosted–style buckets with Secure Sockets Layer (SSL), the SSL
1710
  wildcard certificate only matches buckets that don't contain periods. To work around
1711
  this, use HTTP or write your own certificate verification logic. We recommend that
1712
  you do not use periods (".") in bucket names when using virtual hosted–style buckets.
1713
"""
1714
            )
NEW
1715
        return bucket
×
1716

1717
    def _get_init_django_settings(self, matches: list) -> str:  # type: ignore
5✔
NEW
1718
        django_settings = None
×
NEW
1719
        while django_settings in [None, ""]:
×
NEW
1720
            if matches:
×
NEW
1721
                click.echo(
×
1722
                    "We discovered: "
1723
                    + click.style(
1724
                        ", ".join("{}".format(i) for v, i in enumerate(matches)),
1725
                        bold=True,
1726
                    )
1727
                )
NEW
1728
                django_settings = input("Where are your project's settings? (default '%s'): " % matches[0]) or matches[0]
×
1729
            else:
NEW
1730
                click.echo("(This will likely be something like 'your_project.settings')")
×
NEW
1731
                django_settings = input("Where are your project's settings?: ")
×
NEW
1732
        return django_settings  # type: ignore
×
1733

1734
    def _get_init_app_function(self, matches: list[str]) -> str:  # type: ignore
5✔
1735
        """Get the flask app function from the user."""
NEW
1736
        app_function = None
×
NEW
1737
        while app_function in [None, ""]:
×
NEW
1738
            if matches:
×
NEW
1739
                click.echo(
×
1740
                    "We discovered: "
1741
                    + click.style(
1742
                        ", ".join("{}".format(i) for v, i in enumerate(matches)),
1743
                        bold=True,
1744
                    )
1745
                )
NEW
1746
                app_function = input("Where is your app's function? (default '%s'): " % matches[0]) or matches[0]
×
1747
            else:
NEW
1748
                app_function = input("Where is your app's function?: ")
×
NEW
1749
        return app_function  # type: ignore
×
1750

1751
    def _get_init_global_settings(self) -> tuple[str, bool]:
5✔
NEW
1752
        global_deployment = False
×
NEW
1753
        while True:
×
NEW
1754
            global_type = input(
×
1755
                "Would you like to deploy this application "
1756
                + click.style("globally", bold=True)
1757
                + "? (default 'n') [y/n/(p)rimary]: "
1758
            )
NEW
1759
            if not global_type:
×
NEW
1760
                break
×
NEW
1761
            if global_type.lower() in ["y", "yes", "p", "primary"]:
×
NEW
1762
                global_deployment = True
×
NEW
1763
                break
×
NEW
1764
            if global_type.lower() in ["n", "no"]:
×
NEW
1765
                global_deployment = False
×
NEW
1766
                break
×
NEW
1767
        return global_type, global_deployment
×
1768

1769
    def _get_init_confirm(self) -> str:
5✔
NEW
1770
        confirm = input("\nDoes this look " + click.style("okay", bold=True, fg="green") + "? (default 'y') [y/n]: ") or "yes"
×
NEW
1771
        return confirm
×
1772

1773
    def init(self, settings_file: str = "zappa_settings.json"):
5✔
1774
        """
1775
        Initialize a new Zappa project by creating a new zappa_settings.json in a guided process.
1776
        This should probably be broken up into few separate componants once it's stable.
1777
        Testing these inputs requires monkeypatching with mock, which isn't pretty.
1778
        """
1779

1780
        # Make sure we're in a venv.
1781
        self.check_venv()
5✔
1782

1783
        # Ensure that we don't already have a zappa_settings file.
1784
        settings_file_filepath = Path(settings_file).resolve()
5✔
1785
        if settings_file_filepath.exists() and settings_file_filepath.is_file():
5✔
UNCOV
1786
            raise ClickException(
×
1787
                "This project already has a " + click.style("{0!s} file".format(settings_file), fg="red", bold=True) + "!"
1788
            )
1789

1790
        # Explain system.
1791
        click.echo(
5✔
1792
            click.style(
1793
                """\n███████╗ █████╗ ██████╗ ██████╗  █████╗
1794
╚══███╔╝██╔══██╗██╔══██╗██╔══██╗██╔══██╗
1795
  ███╔╝ ███████║██████╔╝██████╔╝███████║
1796
 ███╔╝  ██╔══██║██╔═══╝ ██╔═══╝ ██╔══██║
1797
███████╗██║  ██║██║     ██║     ██║  ██║
1798
╚══════╝╚═╝  ╚═╝╚═╝     ╚═╝     ╚═╝  ╚═╝\n""",
1799
                fg="green",
1800
                bold=True,
1801
            )
1802
        )
1803

1804
        click.echo(
5✔
1805
            click.style("Welcome to ", bold=True) + click.style("Zappa", fg="green", bold=True) + click.style("!\n", bold=True)
1806
        )
1807
        click.echo(
5✔
1808
            click.style("Zappa", bold=True) + " is a system for running server-less Python web applications"
1809
            " on AWS Lambda and AWS API Gateway."
1810
        )
1811
        click.echo("This `init` command will help you create and configure your new Zappa deployment.")
5✔
1812
        click.echo("Let's get started!\n")
5✔
1813

1814
        env = self._get_init_env()
5✔
1815

1816
        # Detect AWS profiles and regions
1817
        # If anyone knows a more straightforward way to easily detect and
1818
        # parse AWS profiles I'm happy to change this, feels like a hack
1819
        session = botocore.session.Session()
5✔
1820
        config = session.full_config
5✔
1821
        profiles = config.get("profiles", {})
5✔
1822
        profile_names = list(profiles.keys())
5✔
1823

1824
        click.echo(
5✔
1825
            "\nAWS Lambda and API Gateway are only available in certain regions. "
1826
            "Let's check to make sure you have a profile set up in one that will work."
1827
        )
1828

1829
        if not profile_names:
5✔
1830
            profile_name, profile = None, None
5✔
1831
            click.echo(
5✔
1832
                "We couldn't find an AWS profile to use. "
1833
                "Before using Zappa, you'll need to set one up. See here for more info: {}".format(
1834
                    click.style(BOTO3_CONFIG_DOCS_URL, fg="blue", underline=True)
1835
                )
1836
            )
1837
        elif len(profile_names) == 1:
×
1838
            profile_name = profile_names[0]
×
1839
            profile = profiles[profile_name]
×
1840
            click.echo("Okay, using profile {}!".format(click.style(profile_name, bold=True)))
×
1841
        else:
1842
            if "default" in profile_names:
×
1843
                default_profile = [p for p in profile_names if p == "default"][0]
×
1844
            else:
1845
                default_profile = profile_names[0]
×
1846

NEW
1847
            profile_name, profile = self._get_init_profile(default_profile, profiles, profile_names)
×
1848

1849
        profile_region = profile.get("region") if profile else None
5✔
1850

1851
        # Create Bucket
1852
        click.echo(
5✔
1853
            "\nYour Zappa deployments will need to be uploaded to a " + click.style("private S3 bucket", bold=True) + "."
1854
        )
1855
        click.echo("If you don't have a bucket yet, we'll create one for you too.")
5✔
1856
        default_bucket = "zappa-" + "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(9))
5✔
1857
        bucket = self._get_init_bucket(default_bucket)
5✔
1858

1859
        # Detect Django/Flask
1860
        try:  # pragma: no cover
1861
            import django  # noqa: F401
1862

1863
            has_django = True
1864
        except ImportError:
×
1865
            has_django = False
×
1866

1867
        try:  # pragma: no cover
1868
            import flask  # noqa: F401
1869

1870
            has_flask = True
1871
        except ImportError:
×
1872
            has_flask = False
×
1873

1874
        print("")
5✔
1875
        # App-specific
1876
        if has_django:  # pragma: no cover
1877
            click.echo("It looks like this is a " + click.style("Django", bold=True) + " application!")
1878
            click.echo("What is the " + click.style("module path", bold=True) + " to your projects's Django settings?")
1879
            matches = detect_django_settings()
1880
            django_settings = self._get_init_django_settings(matches)
1881
            django_settings = django_settings.replace("'", "")
1882
            django_settings = django_settings.replace('"', "")
1883
        else:
1884
            matches = None
×
1885
            if has_flask:
×
1886
                click.echo("It looks like this is a " + click.style("Flask", bold=True) + " application.")
×
1887
                matches = detect_flask_apps()
×
1888
            click.echo("What's the " + click.style("modular path", bold=True) + " to your app's function?")
×
1889
            click.echo("This will likely be something like 'your_module.app'.")
×
NEW
1890
            app_function = self._get_init_app_function(matches)
×
1891
            app_function = app_function.replace("'", "")
×
1892
            app_function = app_function.replace('"', "")
×
1893

1894
        # Globalize
1895
        click.echo(
5✔
1896
            "\nYou can optionally deploy to "
1897
            + click.style("all available regions", bold=True)
1898
            + " in order to provide fast global service."
1899
        )
1900
        click.echo("If you are using Zappa for the first time, you probably don't want to do this!")
5✔
1901
        global_type, global_deployment = self._get_init_global_settings()
5✔
1902

1903
        # The given environment name
1904
        zappa_settings = {
5✔
1905
            env: {
1906
                "profile_name": profile_name,
1907
                "s3_bucket": bucket,
1908
                "runtime": get_venv_from_python_version(),
1909
                "project_name": self.get_project_name(),
1910
                "exclude": ["boto3", "dateutil", "botocore", "s3transfer", "concurrent"],
1911
            }
1912
        }
1913

1914
        if profile_region:
5✔
1915
            zappa_settings[env]["aws_region"] = profile_region
×
1916

1917
        if has_django:
5✔
1918
            zappa_settings[env]["django_settings"] = django_settings
5✔
1919
        else:
1920
            zappa_settings[env]["app_function"] = app_function
×
1921

1922
        # Global Region Deployment
1923
        if global_deployment:
5✔
1924
            additional_regions = [r for r in API_GATEWAY_REGIONS if r != profile_region]
×
1925
            # Create additional stages
1926
            if global_type.lower() in ["p", "primary"]:
×
1927
                additional_regions = [r for r in additional_regions if "-1" in r]
×
1928

1929
            for region in additional_regions:
×
1930
                env_name = env + "_" + region.replace("-", "_")
×
1931
                g_env = {env_name: {"extends": env, "aws_region": region}}
×
1932
                zappa_settings.update(g_env)
×
1933

1934
        import json as json  # hjson is fine for loading, not fine for writing.
5✔
1935

1936
        zappa_settings_json = json.dumps(zappa_settings, sort_keys=True, indent=4)
5✔
1937

1938
        click.echo("\nOkay, here's your " + click.style("zappa_settings.json", bold=True) + ":\n")
5✔
1939
        click.echo(click.style(zappa_settings_json, fg="yellow", bold=False))
5✔
1940

1941
        confirm = self._get_init_confirm()
5✔
1942
        if confirm[0] not in ["y", "Y", "yes", "YES"]:
5✔
1943
            click.echo("" + click.style("Sorry", bold=True, fg="red") + " to hear that! Please init again.")
×
1944
            return
×
1945

1946
        # Write
1947
        with settings_file_filepath.open("w", encoding="utf8") as zappa_settings_file:
5✔
1948
            zappa_settings_file.write(zappa_settings_json)
5✔
1949

1950
        if global_deployment:
5✔
1951
            click.echo(
×
1952
                "\n"
1953
                + click.style("Done", bold=True)
1954
                + "! You can also "
1955
                + click.style("deploy all", bold=True)
1956
                + " by executing:\n"
1957
            )
1958
            click.echo(click.style("\t$ zappa deploy --all", bold=True))
×
1959

1960
            click.echo("\nAfter that, you can " + click.style("update", bold=True) + " your application code with:\n")
×
1961
            click.echo(click.style("\t$ zappa update --all", bold=True))
×
1962
        else:
1963
            click.echo(
5✔
1964
                "\n"
1965
                + click.style("Done", bold=True)
1966
                + "! Now you can "
1967
                + click.style("deploy", bold=True)
1968
                + " your Zappa application by executing:\n"
1969
            )
1970
            click.echo(click.style("\t$ zappa deploy %s" % env, bold=True))
5✔
1971

1972
            click.echo("\nAfter that, you can " + click.style("update", bold=True) + " your application code with:\n")
5✔
1973
            click.echo(click.style("\t$ zappa update %s" % env, bold=True))
5✔
1974

1975
        click.echo(
5✔
1976
            "\nTo learn more, check out our project page on "
1977
            + click.style("GitHub", bold=True)
1978
            + " here: "
1979
            + click.style("https://github.com/Zappa/Zappa", fg="cyan", bold=True)
1980
        )
1981
        click.echo(
5✔
1982
            "and stop by our "
1983
            + click.style("Slack", bold=True)
1984
            + " channel here: "
1985
            + click.style("https://zappateam.slack.com", fg="cyan", bold=True)
1986
        )
1987
        click.echo("\nEnjoy!,")
5✔
1988
        click.echo(" ~ Team " + click.style("Zappa", bold=True) + "!")
5✔
1989

1990
        return
5✔
1991

1992
    def certify(self, no_confirm=True, manual=False):
5✔
1993
        """
1994
        Register or update a domain certificate for this env.
1995
        """
1996

1997
        if not self.domain:
5✔
1998
            raise ClickException(
×
1999
                "Can't certify a domain without " + click.style("domain", fg="red", bold=True) + " configured!"
2000
            )
2001

2002
        if not no_confirm:  # pragma: no cover
2003
            confirm = input("Are you sure you want to certify? [y/n] ")
2004
            if confirm != "y":
2005
                return
2006

2007
        # Make sure this isn't already deployed.
2008
        deployed_versions = self.zappa.get_lambda_function_versions(self.lambda_name)
5✔
2009
        if len(deployed_versions) == 0:
5✔
2010
            raise ClickException(
5✔
2011
                "This application "
2012
                + click.style("isn't deployed yet", fg="red")
2013
                + " - did you mean to call "
2014
                + click.style("deploy", bold=True)
2015
                + "?"
2016
            )
2017

2018
        account_key_location = self.stage_config.get("lets_encrypt_key", None)
5✔
2019
        cert_location = self.stage_config.get("certificate", None)
5✔
2020
        cert_key_location = self.stage_config.get("certificate_key", None)
5✔
2021
        cert_chain_location = self.stage_config.get("certificate_chain", None)
5✔
2022
        cert_arn = self.stage_config.get("certificate_arn", None)
5✔
2023
        base_path = self.stage_config.get("base_path", None)
5✔
2024

2025
        # These are sensitive
2026
        certificate_body = None
5✔
2027
        certificate_private_key = None
5✔
2028
        certificate_chain = None
5✔
2029

2030
        # Prepare for custom Let's Encrypt
2031
        if not cert_location and not cert_arn:
5✔
2032
            if not account_key_location:
5✔
2033
                raise ClickException(
5✔
2034
                    "Can't certify a domain without "
2035
                    + click.style("lets_encrypt_key", fg="red", bold=True)
2036
                    + " or "
2037
                    + click.style("certificate", fg="red", bold=True)
2038
                    + " or "
2039
                    + click.style("certificate_arn", fg="red", bold=True)
2040
                    + " configured!"
2041
                )
2042

2043
            # Get install account_key to /tmp/account_key.pem
2044
            from .letsencrypt import gettempdir
×
2045

2046
            if account_key_location.startswith("s3://"):
×
2047
                bucket, key_name = parse_s3_url(account_key_location)
×
2048
                self.zappa.s3_client.download_file(bucket, key_name, os.path.join(gettempdir(), "account.key"))
×
2049
            else:
2050
                from shutil import copyfile
×
2051

2052
                copyfile(account_key_location, os.path.join(gettempdir(), "account.key"))
×
2053

2054
        # Prepare for Custom SSL
2055
        elif not account_key_location and not cert_arn:
5✔
2056
            if not cert_location or not cert_key_location or not cert_chain_location:
5✔
2057
                raise ClickException(
5✔
2058
                    "Can't certify a domain without "
2059
                    + click.style(
2060
                        "certificate, certificate_key and certificate_chain",
2061
                        fg="red",
2062
                        bold=True,
2063
                    )
2064
                    + " configured!"
2065
                )
2066

2067
            # Read the supplied certificates.
2068
            with open(cert_location) as f:
5✔
2069
                certificate_body = f.read()
5✔
2070

2071
            with open(cert_key_location) as f:
5✔
2072
                certificate_private_key = f.read()
5✔
2073

2074
            with open(cert_chain_location) as f:
5✔
2075
                certificate_chain = f.read()
5✔
2076

2077
        click.echo("Certifying domain " + click.style(self.domain, fg="green", bold=True) + "..")
5✔
2078

2079
        # Get cert and update domain.
2080

2081
        # Let's Encrypt
2082
        if not cert_location and not cert_arn:
5✔
2083
            from .letsencrypt import get_cert_and_update_domain
×
2084

2085
            cert_success = get_cert_and_update_domain(self.zappa, self.lambda_name, self.api_stage, self.domain, manual)
×
2086

2087
        # Custom SSL / ACM
2088
        else:
2089
            route53 = self.stage_config.get("route53_enabled", True)
5✔
2090
            if not self.zappa.get_domain_name(self.domain, route53=route53):
5✔
2091
                dns_name = self.zappa.create_domain_name(
5✔
2092
                    domain_name=self.domain,
2093
                    certificate_name=self.domain + "-Zappa-Cert",
2094
                    certificate_body=certificate_body,
2095
                    certificate_private_key=certificate_private_key,
2096
                    certificate_chain=certificate_chain,
2097
                    certificate_arn=cert_arn,
2098
                    lambda_name=self.lambda_name,
2099
                    stage=self.api_stage,
2100
                    base_path=base_path,
2101
                )
2102
                if route53:
5✔
2103
                    self.zappa.update_route53_records(self.domain, dns_name)
5✔
2104
                print(
5✔
2105
                    "Created a new domain name with supplied certificate. "
2106
                    "Please note that it can take up to 40 minutes for this domain to be "
2107
                    "created and propagated through AWS, but it requires no further work on your part."
2108
                )
2109
            else:
2110
                self.zappa.update_domain_name(
5✔
2111
                    domain_name=self.domain,
2112
                    certificate_name=self.domain + "-Zappa-Cert",
2113
                    certificate_body=certificate_body,
2114
                    certificate_private_key=certificate_private_key,
2115
                    certificate_chain=certificate_chain,
2116
                    certificate_arn=cert_arn,
2117
                    lambda_name=self.lambda_name,
2118
                    stage=self.api_stage,
2119
                    route53=route53,
2120
                    base_path=base_path,
2121
                )
2122

2123
            cert_success = True
5✔
2124

2125
        if cert_success:
5✔
2126
            click.echo("Certificate " + click.style("updated", fg="green", bold=True) + "!")
5✔
2127
        else:
2128
            click.echo(click.style("Failed", fg="red", bold=True) + " to generate or install certificate! :(")
×
2129
            click.echo("\n==============\n")
×
2130
            shamelessly_promote()
×
2131

2132
    ##
2133
    # Shell
2134
    ##
2135
    def shell(self):
5✔
2136
        """
2137
        Spawn a debug shell.
2138
        """
2139
        click.echo(
×
2140
            click.style("NOTICE!", fg="yellow", bold=True)
2141
            + " This is a "
2142
            + click.style("local", fg="green", bold=True)
2143
            + " shell, inside a "
2144
            + click.style("Zappa", bold=True)
2145
            + " object!"
2146
        )
2147
        self.zappa.shell()
×
2148
        return
×
2149

2150
    ##
2151
    # Utility
2152
    ##
2153

2154
    def callback(self, position):
5✔
2155
        """
2156
        Allows the execution of custom code between creation of the zip file and deployment to AWS.
2157
        :return: None
2158
        """
2159

2160
        callbacks = self.stage_config.get("callbacks", {})
5✔
2161
        callback = callbacks.get(position)
5✔
2162

2163
        if callback:
5✔
2164
            (mod_path, cb_func_name) = callback.rsplit(".", 1)
5✔
2165

2166
            try:  # Prefer callback in working directory
5✔
2167
                if mod_path.count(".") >= 1:  # Callback function is nested in a folder
5✔
2168
                    (mod_folder_path, mod_name) = mod_path.rsplit(".", 1)
5✔
2169
                    mod_folder_path_fragments = mod_folder_path.split(".")
5✔
2170
                    working_dir = os.path.join(os.getcwd(), *mod_folder_path_fragments)
5✔
2171
                else:
2172
                    mod_name = mod_path
5✔
2173
                    working_dir = os.getcwd()
5✔
2174

2175
                working_dir_importer = pkgutil.get_importer(working_dir)
5✔
2176
                module_ = working_dir_importer.find_spec(mod_name).loader.load_module(mod_name)
5✔
2177

2178
            except (ImportError, AttributeError):
×
2179
                try:  # Callback func might be in virtualenv
×
2180
                    module_ = importlib.import_module(mod_path)
×
2181
                except ImportError:  # pragma: no cover
2182
                    raise ClickException(
2183
                        click.style("Failed ", fg="red")
2184
                        + "to "
2185
                        + click.style(
2186
                            "import {position} callback ".format(position=position),
2187
                            bold=True,
2188
                        )
2189
                        + 'module: "{mod_path}"'.format(mod_path=click.style(mod_path, bold=True))
2190
                    )
2191

2192
            if not hasattr(module_, cb_func_name):  # pragma: no cover
2193
                raise ClickException(
2194
                    click.style("Failed ", fg="red")
2195
                    + "to "
2196
                    + click.style("find {position} callback ".format(position=position), bold=True)
2197
                    + 'function: "{cb_func_name}" '.format(cb_func_name=click.style(cb_func_name, bold=True))
2198
                    + 'in module "{mod_path}"'.format(mod_path=mod_path)
2199
                )
2200

2201
            cb_func = getattr(module_, cb_func_name)
5✔
2202
            cb_func(self)  # Call the function passing self
5✔
2203

2204
    def check_for_update(self):
5✔
2205
        """
2206
        Print a warning if there's a new Zappa version available.
2207
        """
2208
        try:
5✔
2209
            updateable = check_new_version_available(__version__)
5✔
2210
            if updateable:
5✔
UNCOV
2211
                click.echo(
×
2212
                    click.style("Important!", fg="yellow", bold=True)
2213
                    + " A new version of "
2214
                    + click.style("Zappa", bold=True)
2215
                    + " is available!"
2216
                )
UNCOV
2217
                click.echo("Upgrade with: " + click.style("pip install zappa --upgrade", bold=True))
×
UNCOV
2218
                click.echo(
×
2219
                    "Visit the project page on GitHub to see the latest changes: "
2220
                    + click.style("https://github.com/Zappa/Zappa", bold=True)
2221
                )
2222
        except Exception as e:  # pragma: no cover
2223
            print(e)
2224
            return
2225

2226
    def load_settings(self, settings_file=None, session=None):
5✔
2227
        """
2228
        Load the local zappa_settings file.
2229
        An existing boto session can be supplied, though this is likely for testing purposes.
2230
        Returns the loaded Zappa object.
2231
        """
2232

2233
        # Ensure we're passed a valid settings file.
2234
        if not settings_file:
5✔
2235
            settings_file = self.get_json_or_yaml_settings()
×
2236
        if not os.path.isfile(settings_file):
5✔
2237
            raise ClickException("Please configure your zappa_settings file.")
×
2238

2239
        # Load up file
2240
        self.load_settings_file(settings_file)
5✔
2241

2242
        # Make sure that this stage is our settings
2243
        if self.api_stage not in self.zappa_settings.keys():
5✔
2244
            raise ClickException("Please define stage '{0!s}' in your Zappa settings.".format(self.api_stage))
×
2245

2246
        # We need a working title for this project. Use one if supplied, else cwd dirname.
2247
        if "project_name" in self.stage_config:  # pragma: no cover
2248
            # If the name is invalid, this will throw an exception with message up stack
2249
            self.project_name = validate_name(self.stage_config["project_name"])
2250
        else:
2251
            self.project_name = self.get_project_name()
5✔
2252

2253
        # The name of the actual AWS Lambda function, ex, 'helloworld-dev'
2254
        # Assume that we already have have validated the name beforehand.
2255
        # Related:  https://github.com/Miserlou/Zappa/pull/664
2256
        #           https://github.com/Miserlou/Zappa/issues/678
2257
        #           And various others from Slack.
2258
        self.lambda_name = slugify.slugify(self.project_name + "-" + self.api_stage)
5✔
2259

2260
        # Load stage-specific settings
2261
        self.s3_bucket_name = self.stage_config.get(
5✔
2262
            "s3_bucket",
2263
            "zappa-" + "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(9)),
2264
        )
2265
        self.vpc_config = self.stage_config.get("vpc_config", {})
5✔
2266
        self.memory_size = self.stage_config.get("memory_size", 512)
5✔
2267
        self.ephemeral_storage = self.stage_config.get("ephemeral_storage", {"Size": 512})
5✔
2268

2269
        # Validate ephemeral storage structure and size
2270
        if "Size" not in self.ephemeral_storage:
5✔
2271
            raise ClickException("Please provide a valid Size for ephemeral_storage in your Zappa settings.")
5✔
2272
        elif not 512 <= self.ephemeral_storage["Size"] <= 10240:
5✔
2273
            raise ClickException("Please provide a valid ephemeral_storage size between 512 - 10240 in your Zappa settings.")
5✔
2274

2275
        self.app_function = self.stage_config.get("app_function", None)
5✔
2276
        self.exception_handler = self.stage_config.get("exception_handler", None)
5✔
2277
        self.aws_region = self.stage_config.get("aws_region", None)
5✔
2278
        self.debug = self.stage_config.get("debug", True)
5✔
2279
        self.prebuild_script = self.stage_config.get("prebuild_script", None)
5✔
2280
        self.profile_name = self.stage_config.get("profile_name", None)
5✔
2281
        self.log_level = self.stage_config.get("log_level", "DEBUG")
5✔
2282
        self.domain = self.stage_config.get("domain", None)
5✔
2283
        self.base_path = self.stage_config.get("base_path", None)
5✔
2284
        self.timeout_seconds = self.stage_config.get("timeout_seconds", 30)
5✔
2285
        dead_letter_arn = self.stage_config.get("dead_letter_arn", "")
5✔
2286
        self.dead_letter_config = {"TargetArn": dead_letter_arn} if dead_letter_arn else {}
5✔
2287
        self.cognito = self.stage_config.get("cognito", None)
5✔
2288
        self.num_retained_versions = self.stage_config.get("num_retained_versions", None)
5✔
2289
        self.architecture = self.stage_config.get("architecture", "x86_64")
5✔
2290
        # Check for valid values of num_retained_versions
2291
        if self.num_retained_versions is not None and type(self.num_retained_versions) is not int:
5✔
2292
            raise ClickException(
×
2293
                "Please supply either an integer or null for num_retained_versions in the zappa_settings.json. Found %s"
2294
                % type(self.num_retained_versions)
2295
            )
2296
        elif type(self.num_retained_versions) is int and self.num_retained_versions < 1:
5✔
2297
            raise ClickException("The value for num_retained_versions in the zappa_settings.json should be greater than 0.")
×
2298

2299
        # Provide legacy support for `use_apigateway`, now `apigateway_enabled`.
2300
        # https://github.com/Miserlou/Zappa/issues/490
2301
        # https://github.com/Miserlou/Zappa/issues/493
2302
        self.use_apigateway = self.stage_config.get("use_apigateway", True)
5✔
2303
        if self.use_apigateway:
5✔
2304
            self.use_apigateway = self.stage_config.get("apigateway_enabled", True)
5✔
2305
        self.apigateway_description = self.stage_config.get("apigateway_description", None)
5✔
2306

2307
        self.lambda_handler = self.stage_config.get("lambda_handler", "handler.lambda_handler")
5✔
2308
        # DEPRECATED. https://github.com/Miserlou/Zappa/issues/456
2309
        self.remote_env_bucket = self.stage_config.get("remote_env_bucket", None)
5✔
2310
        self.remote_env_file = self.stage_config.get("remote_env_file", None)
5✔
2311
        self.remote_env = self.stage_config.get("remote_env", None)
5✔
2312
        self.settings_file = self.stage_config.get("settings_file", None)
5✔
2313
        self.django_settings = self.stage_config.get("django_settings", None)
5✔
2314
        self.manage_roles = self.stage_config.get("manage_roles", True)
5✔
2315
        self.binary_support = self.stage_config.get("binary_support", True)
5✔
2316
        self.api_key_required = self.stage_config.get("api_key_required", False)
5✔
2317
        self.api_key = self.stage_config.get("api_key")
5✔
2318
        self.endpoint_configuration = self.stage_config.get("endpoint_configuration", None)
5✔
2319
        self.iam_authorization = self.stage_config.get("iam_authorization", False)
5✔
2320
        self.cors = self.stage_config.get("cors", False)
5✔
2321
        self.lambda_description = self.stage_config.get("lambda_description", "Zappa Deployment")
5✔
2322
        self.lambda_concurrency = self.stage_config.get("lambda_concurrency", None)
5✔
2323
        self.environment_variables = self.stage_config.get("environment_variables", {})
5✔
2324
        self.aws_environment_variables = self.stage_config.get("aws_environment_variables", {})
5✔
2325
        self.check_environment(self.environment_variables)
5✔
2326
        self.authorizer = self.stage_config.get("authorizer", {})
5✔
2327
        self.runtime = self.stage_config.get("runtime", get_runtime_from_python_version())
5✔
2328
        self.aws_kms_key_arn = self.stage_config.get("aws_kms_key_arn", "")
5✔
2329
        self.snap_start = self.stage_config.get("snap_start", "None")
5✔
2330
        self.context_header_mappings = self.stage_config.get("context_header_mappings", {})
5✔
2331
        self.xray_tracing = self.stage_config.get("xray_tracing", False)
5✔
2332
        self.desired_role_arn = self.stage_config.get("role_arn")
5✔
2333
        self.layers = self.stage_config.get("layers", None)
5✔
2334
        self.additional_text_mimetypes = self.stage_config.get("additional_text_mimetypes", None)
5✔
2335

2336
        # check that BINARY_SUPPORT is True if additional_text_mimetypes is provided
2337
        if self.additional_text_mimetypes and not self.binary_support:
5✔
2338
            raise ClickException("zappa_settings.json has additional_text_mimetypes defined, but binary_support is False!")
5✔
2339

2340
        # Load ALB-related settings
2341
        self.use_alb = self.stage_config.get("alb_enabled", False)
5✔
2342
        self.alb_vpc_config = self.stage_config.get("alb_vpc_config", {})
5✔
2343

2344
        # Additional tags
2345
        self.tags = self.stage_config.get("tags", {})
5✔
2346

2347
        desired_role_name = self.lambda_name + "-ZappaLambdaExecutionRole"
5✔
2348
        self.zappa = Zappa(
5✔
2349
            boto_session=session,
2350
            profile_name=self.profile_name,
2351
            aws_region=self.aws_region,
2352
            load_credentials=self.load_credentials,
2353
            desired_role_name=desired_role_name,
2354
            desired_role_arn=self.desired_role_arn,
2355
            runtime=self.runtime,
2356
            tags=self.tags,
2357
            endpoint_urls=self.stage_config.get("aws_endpoint_urls", {}),
2358
            xray_tracing=self.xray_tracing,
2359
            architecture=self.architecture,
2360
        )
2361

2362
        for setting in CUSTOM_SETTINGS:
5✔
2363
            if setting in self.stage_config:
5✔
2364
                setting_val = self.stage_config[setting]
5✔
2365
                # Read the policy file contents.
2366
                if setting.endswith("policy"):
5✔
2367
                    with open(setting_val, "r") as f:
×
2368
                        setting_val = f.read()
×
2369
                setattr(self.zappa, setting, setting_val)
5✔
2370

2371
        if self.app_function:
5✔
2372
            self.collision_warning(self.app_function)
5✔
2373
            if self.app_function[-3:] == ".py":
5✔
2374
                click.echo(
×
2375
                    click.style("Warning!", fg="red", bold=True)
2376
                    + " Your app_function is pointing to a "
2377
                    + click.style("file and not a function", bold=True)
2378
                    + "! It should probably be something like 'my_file.app', not 'my_file.py'!"
2379
                )
2380

2381
        return self.zappa
5✔
2382

2383
    def get_json_or_yaml_settings(self, settings_name="zappa_settings"):
5✔
2384
        """
2385
        Return zappa_settings path as JSON or YAML (or TOML), as appropriate.
2386
        """
2387
        zs_json = settings_name + ".json"
5✔
2388
        zs_yml = settings_name + ".yml"
5✔
2389
        zs_yaml = settings_name + ".yaml"
5✔
2390
        zs_toml = settings_name + ".toml"
5✔
2391

2392
        # Must have at least one
2393
        if (
5✔
2394
            not os.path.isfile(zs_json)
2395
            and not os.path.isfile(zs_yml)
2396
            and not os.path.isfile(zs_yaml)
2397
            and not os.path.isfile(zs_toml)
2398
        ):
2399
            raise ClickException("Please configure a zappa_settings file or call `zappa init`.")
5✔
2400

2401
        # Prefer JSON
2402
        if os.path.isfile(zs_json):
5✔
2403
            settings_file = zs_json
5✔
2404
        elif os.path.isfile(zs_toml):
5✔
2405
            settings_file = zs_toml
5✔
2406
        elif os.path.isfile(zs_yml):
5✔
2407
            settings_file = zs_yml
5✔
2408
        else:
2409
            settings_file = zs_yaml
5✔
2410

2411
        return settings_file
5✔
2412

2413
    def load_settings_file(self, settings_file=None):
5✔
2414
        """
2415
        Load our settings file.
2416
        """
2417

2418
        if not settings_file:
5✔
2419
            settings_file = self.get_json_or_yaml_settings()
5✔
2420
        if not os.path.isfile(settings_file):
5✔
2421
            raise ClickException("Please configure your zappa_settings file or call `zappa init`.")
×
2422

2423
        path, ext = os.path.splitext(settings_file)
5✔
2424
        if ext == ".yml" or ext == ".yaml":
5✔
2425
            with open(settings_file) as yaml_file:
5✔
2426
                try:
5✔
2427
                    self.zappa_settings = yaml.safe_load(yaml_file)
5✔
2428
                except ValueError:  # pragma: no cover
2429
                    raise ValueError("Unable to load the Zappa settings YAML. It may be malformed.")
2430
        elif ext == ".toml":
5✔
2431
            with open(settings_file) as toml_file:
5✔
2432
                try:
5✔
2433
                    self.zappa_settings = toml.load(toml_file)
5✔
2434
                except ValueError:  # pragma: no cover
2435
                    raise ValueError("Unable to load the Zappa settings TOML. It may be malformed.")
2436
        else:
2437
            with open(settings_file) as json_file:
5✔
2438
                try:
5✔
2439
                    self.zappa_settings = json.load(json_file)
5✔
2440
                except ValueError:  # pragma: no cover
2441
                    raise ValueError("Unable to load the Zappa settings JSON. It may be malformed.")
2442

2443
    def create_package(self, output=None, use_zappa_release: Optional[str] = None):
5✔
2444
        """
2445
        Ensure that the package can be properly configured,
2446
        and then create it.
2447
        """
2448

2449
        # Create the Lambda zip package (includes project and virtualenvironment)
2450
        # Also define the path the handler file so it can be copied to the zip
2451
        # root for Lambda.
2452
        current_file = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))  # type: ignore[arg-type]
5✔
2453
        handler_file = os.sep.join(current_file.split(os.sep)[0:]) + os.sep + "handler.py"
5✔
2454

2455
        # Create the zip file(s)
2456
        if self.stage_config.get("slim_handler", False):
5✔
2457
            # Create two zips. One with the application and the other with just the handler.
2458
            # https://github.com/Miserlou/Zappa/issues/510
2459
            self.zip_path = self.zappa.create_lambda_zip(  # type: ignore[attr-defined]
5✔
2460
                prefix=self.lambda_name,
2461
                use_precompiled_packages=self.stage_config.get("use_precompiled_packages", True),
2462
                exclude=self.stage_config.get("exclude", []),
2463
                exclude_glob=self.stage_config.get("exclude_glob", []),
2464
                disable_progress=self.disable_progress,
2465
                archive_format="tarball",
2466
            )
2467

2468
            # Make sure the normal venv is not included in the handler's zip
2469
            exclude = self.stage_config.get("exclude", [])
5✔
2470
            cur_venv = self.zappa.get_current_venv()  # type: ignore[attr-defined]
5✔
2471
            exclude.append(cur_venv.name)
5✔
2472
            self.handler_path = self.zappa.create_lambda_zip(  # type: ignore[attr-defined]
5✔
2473
                prefix="handler_{0!s}".format(self.lambda_name),
2474
                venv=self.zappa.create_handler_venv(use_zappa_release=use_zappa_release),  # type: ignore[attr-defined]
2475
                handler_file=handler_file,
2476
                slim_handler=True,
2477
                exclude=exclude,
2478
                exclude_glob=self.stage_config.get("exclude_glob", []),
2479
                output=output,
2480
                disable_progress=self.disable_progress,
2481
            )
2482
        else:
2483
            exclude = self.stage_config.get("exclude", [])
5✔
2484

2485
            # Create a single zip that has the handler and application
2486
            self.zip_path = self.zappa.create_lambda_zip(  # type: ignore[attr-defined]
5✔
2487
                prefix=self.lambda_name,
2488
                handler_file=handler_file,
2489
                use_precompiled_packages=self.stage_config.get("use_precompiled_packages", True),
2490
                exclude=exclude,
2491
                exclude_glob=self.stage_config.get("exclude_glob", []),
2492
                output=output,
2493
                disable_progress=self.disable_progress,
2494
            )
2495

2496
            # Warn if this is too large for Lambda.
2497
            file_stats = os.stat(self.zip_path)
5✔
2498
            if file_stats.st_size > 52428800:  # pragma: no cover
2499
                print(
2500
                    "\n\nWarning: Application zip package is likely to be too large for AWS Lambda. "
2501
                    'Try setting "slim_handler" to true in your Zappa settings file.\n\n'
2502
                )
2503

2504
        # Throw custom settings into the zip that handles requests
2505
        if self.stage_config.get("slim_handler", False):
5✔
2506
            handler_zip = self.handler_path
5✔
2507
        else:
2508
            handler_zip = self.zip_path
5✔
2509

2510
        with zipfile.ZipFile(handler_zip, "a") as lambda_zip:  # type: ignore[call-overload]
5✔
2511
            settings_s = self.get_zappa_settings_string()
5✔
2512

2513
            # Copy our Django app into root of our package.
2514
            # It doesn't work otherwise.
2515
            if self.django_settings:
5✔
2516
                base = __file__.rsplit(os.sep, 1)[0]
×
2517
                django_py = "".join(os.path.join(base, "ext", "django_zappa.py"))
×
2518
                lambda_zip.write(django_py, "django_zappa_app.py")
×
2519

2520
            # Lambda requires a specific chmod
2521
            temp_settings = tempfile.NamedTemporaryFile(delete=False)
5✔
2522
            os.chmod(temp_settings.name, 0o644)
5✔
2523
            temp_settings.write(bytes(settings_s, "utf-8"))
5✔
2524
            temp_settings.close()
5✔
2525
            lambda_zip.write(temp_settings.name, "zappa_settings.py")
5✔
2526
            os.unlink(temp_settings.name)
5✔
2527

2528
    def get_zappa_settings_string(self):
5✔
2529
        settings_s = "# Generated by Zappa\n"
5✔
2530

2531
        if self.app_function:
5✔
2532
            if "." not in self.app_function:  # pragma: no cover
2533
                raise ClickException(
2534
                    "Your "
2535
                    + click.style("app_function", fg="red", bold=True)
2536
                    + " value is not a modular path."
2537
                    + " It needs to be in the format `"
2538
                    + click.style("your_module.your_app_object", bold=True)
2539
                    + "`."
2540
                )
2541
            app_module, app_function = self.app_function.rsplit(".", 1)
5✔
2542
            settings_s = settings_s + "APP_MODULE='{0!s}'\nAPP_FUNCTION='{1!s}'\n".format(app_module, app_function)
5✔
2543

2544
        if self.exception_handler:
5✔
2545
            settings_s += "EXCEPTION_HANDLER='{0!s}'\n".format(self.exception_handler)
×
2546
        else:
2547
            settings_s += "EXCEPTION_HANDLER=None\n"
5✔
2548

2549
        if self.debug:
5✔
2550
            settings_s = settings_s + "DEBUG=True\n"
5✔
2551
        else:
2552
            settings_s = settings_s + "DEBUG=False\n"
×
2553

2554
        settings_s = settings_s + "LOG_LEVEL='{0!s}'\n".format((self.log_level))
5✔
2555

2556
        if self.binary_support:
5✔
2557
            settings_s = settings_s + "BINARY_SUPPORT=True\n"
5✔
2558
        else:
2559
            settings_s = settings_s + "BINARY_SUPPORT=False\n"
×
2560

2561
        head_map_dict = {}
5✔
2562
        head_map_dict.update(dict(self.context_header_mappings))
5✔
2563
        settings_s = settings_s + "CONTEXT_HEADER_MAPPINGS={0}\n".format(head_map_dict)
5✔
2564

2565
        # If we're on a domain, we don't need to define the /<<env>> in
2566
        # the WSGI PATH
2567
        if self.domain:
5✔
2568
            settings_s = settings_s + "DOMAIN='{0!s}'\n".format((self.domain))
×
2569
        else:
2570
            settings_s = settings_s + "DOMAIN=None\n"
5✔
2571

2572
        if self.base_path:
5✔
2573
            settings_s = settings_s + "BASE_PATH='{0!s}'\n".format((self.base_path))
×
2574
        else:
2575
            settings_s = settings_s + "BASE_PATH=None\n"
5✔
2576

2577
        # Pass through remote config bucket and path
2578
        if self.remote_env:
5✔
2579
            settings_s = settings_s + "REMOTE_ENV='{0!s}'\n".format(self.remote_env)
5✔
2580
        # DEPRECATED. use remove_env instead
2581
        elif self.remote_env_bucket and self.remote_env_file:
5✔
2582
            settings_s = settings_s + "REMOTE_ENV='s3://{0!s}/{1!s}'\n".format(self.remote_env_bucket, self.remote_env_file)
5✔
2583

2584
        # Local envs
2585
        env_dict = {}
5✔
2586
        if self.aws_region:
5✔
2587
            env_dict["AWS_REGION"] = self.aws_region
×
2588
        env_dict.update(dict(self.environment_variables))
5✔
2589

2590
        # Environment variable keys must be ascii
2591
        # https://github.com/Miserlou/Zappa/issues/604
2592
        # https://github.com/Miserlou/Zappa/issues/998
2593
        try:
5✔
2594
            env_dict = dict((k.encode("ascii").decode("ascii"), v) for (k, v) in env_dict.items())
5✔
2595
        except Exception:
5✔
2596
            raise ValueError("Environment variable keys must be ascii.")
5✔
2597

2598
        settings_s = settings_s + "ENVIRONMENT_VARIABLES={0}\n".format(env_dict)
5✔
2599

2600
        # We can be environment-aware
2601
        settings_s = settings_s + "API_STAGE='{0!s}'\n".format((self.api_stage))
5✔
2602
        settings_s = settings_s + "PROJECT_NAME='{0!s}'\n".format((self.project_name))
5✔
2603

2604
        if self.settings_file:
5✔
2605
            settings_s = settings_s + "SETTINGS_FILE='{0!s}'\n".format((self.settings_file))
×
2606
        else:
2607
            settings_s = settings_s + "SETTINGS_FILE=None\n"
5✔
2608

2609
        if self.django_settings:
5✔
2610
            settings_s = settings_s + "DJANGO_SETTINGS='{0!s}'\n".format((self.django_settings))
×
2611
        else:
2612
            settings_s = settings_s + "DJANGO_SETTINGS=None\n"
5✔
2613

2614
        # If slim handler, path to project zip
2615
        if self.stage_config.get("slim_handler", False):
5✔
2616
            settings_s += "ARCHIVE_PATH='s3://{0!s}/{1!s}_{2!s}_current_project.tar.gz'\n".format(
5✔
2617
                self.s3_bucket_name, self.api_stage, self.project_name
2618
            )
2619

2620
            # since includes are for slim handler add the setting here by joining arbitrary list from zappa_settings file
2621
            # and tell the handler we are the slim_handler
2622
            # https://github.com/Miserlou/Zappa/issues/776
2623
            settings_s += "SLIM_HANDLER=True\n"
5✔
2624

2625
            include = self.stage_config.get("include", [])
5✔
2626
            if len(include) >= 1:
5✔
2627
                settings_s += "INCLUDE=" + str(include) + "\n"
×
2628

2629
        # AWS Events function mapping
2630
        event_mapping = {}
5✔
2631
        events = self.stage_config.get("events", [])
5✔
2632
        for event in events:
5✔
2633
            arn = event.get("event_source", {}).get("arn")
5✔
2634
            function = event.get("function")
5✔
2635
            if arn and function:
5✔
2636
                event_mapping[arn] = function
5✔
2637
        settings_s = settings_s + "AWS_EVENT_MAPPING={0!s}\n".format(event_mapping)
5✔
2638

2639
        # Map Lext bot events
2640
        bot_events = self.stage_config.get("bot_events", [])
5✔
2641
        bot_events_mapping = {}
5✔
2642
        for bot_event in bot_events:
5✔
2643
            event_source = bot_event.get("event_source", {})
×
2644
            intent = event_source.get("intent")
×
2645
            invocation_source = event_source.get("invocation_source")
×
2646
            function = bot_event.get("function")
×
2647
            if intent and invocation_source and function:
×
2648
                bot_events_mapping[str(intent) + ":" + str(invocation_source)] = function
×
2649

2650
        settings_s = settings_s + "AWS_BOT_EVENT_MAPPING={0!s}\n".format(bot_events_mapping)
5✔
2651

2652
        # Map cognito triggers
2653
        cognito_trigger_mapping = {}
5✔
2654
        cognito_config = self.stage_config.get("cognito", {})
5✔
2655
        triggers = cognito_config.get("triggers", [])
5✔
2656
        for trigger in triggers:
5✔
2657
            source = trigger.get("source")
5✔
2658
            function = trigger.get("function")
5✔
2659
            if source and function:
5✔
2660
                cognito_trigger_mapping[source] = function
5✔
2661
        settings_s = settings_s + "COGNITO_TRIGGER_MAPPING={0!s}\n".format(cognito_trigger_mapping)
5✔
2662

2663
        # Authorizer config
2664
        authorizer_function = self.authorizer.get("function", None)
5✔
2665
        if authorizer_function:
5✔
2666
            settings_s += "AUTHORIZER_FUNCTION='{0!s}'\n".format(authorizer_function)
×
2667

2668
        # async response
2669
        async_response_table = self.stage_config.get("async_response_table", "")
5✔
2670
        settings_s += "ASYNC_RESPONSE_TABLE='{0!s}'\n".format(async_response_table)
5✔
2671

2672
        # additional_text_mimetypes
2673
        additional_text_mimetypes = self.stage_config.get("additional_text_mimetypes", [])
5✔
2674
        settings_s += f"ADDITIONAL_TEXT_MIMETYPES={additional_text_mimetypes}\n"
5✔
2675
        return settings_s
5✔
2676

2677
    def remove_local_zip(self):
5✔
2678
        """
2679
        Remove our local zip file.
2680
        """
2681

2682
        if self.stage_config.get("delete_local_zip", True):
5✔
2683
            try:
5✔
2684
                if os.path.isfile(self.zip_path):
5✔
2685
                    os.remove(self.zip_path)
5✔
2686
                if self.handler_path and os.path.isfile(self.handler_path):
5✔
2687
                    os.remove(self.handler_path)
5✔
2688
            except Exception:  # pragma: no cover
2689
                sys.exit(-1)
2690

2691
    def remove_uploaded_zip(self):
5✔
2692
        """
2693
        Remove the local and S3 zip file after uploading and updating.
2694
        """
2695

2696
        # Remove the uploaded zip from S3, because it is now registered..
2697
        if self.stage_config.get("delete_s3_zip", True):
5✔
2698
            self.zappa.remove_from_s3(self.zip_path, self.s3_bucket_name)
5✔
2699
            if self.stage_config.get("slim_handler", False):
5✔
2700
                # Need to keep the project zip as the slim handler uses it.
2701
                self.zappa.remove_from_s3(self.handler_path, self.s3_bucket_name)
×
2702

2703
    def on_exit(self):
5✔
2704
        """
2705
        Cleanup after the command finishes.
2706
        Always called: SystemExit, KeyboardInterrupt and any other Exception that occurs.
2707
        """
2708
        if self.zip_path:
5✔
2709
            # Only try to remove uploaded zip if we're running a command that has loaded credentials
2710
            if self.load_credentials:
5✔
2711
                self.remove_uploaded_zip()
5✔
2712

2713
            self.remove_local_zip()
5✔
2714

2715
    def print_logs(self, logs, colorize=True, http=False, non_http=False, force_colorize=None):
5✔
2716
        """
2717
        Parse, filter and print logs to the console.
2718
        """
2719

2720
        for log in logs:
5✔
2721
            timestamp = log["timestamp"]
5✔
2722
            message = log["message"]
5✔
2723
            if "START RequestId" in message:
5✔
2724
                continue
5✔
2725
            if "REPORT RequestId" in message:
5✔
2726
                continue
5✔
2727
            if "END RequestId" in message:
5✔
2728
                continue
5✔
2729

2730
            if not colorize and not force_colorize:
5✔
2731
                if http:
5✔
2732
                    if self.is_http_log_entry(message.strip()):
5✔
2733
                        print("[" + str(timestamp) + "] " + message.strip())
5✔
2734
                elif non_http:
5✔
2735
                    if not self.is_http_log_entry(message.strip()):
×
2736
                        print("[" + str(timestamp) + "] " + message.strip())
×
2737
                else:
2738
                    print("[" + str(timestamp) + "] " + message.strip())
5✔
2739
            else:
2740
                if http:
5✔
2741
                    if self.is_http_log_entry(message.strip()):
5✔
2742
                        click.echo(
5✔
2743
                            click.style("[", fg="cyan")
2744
                            + click.style(str(timestamp), bold=True)
2745
                            + click.style("]", fg="cyan")
2746
                            + self.colorize_log_entry(message.strip()),
2747
                            color=force_colorize,
2748
                        )
2749
                elif non_http:
5✔
2750
                    if not self.is_http_log_entry(message.strip()):
5✔
2751
                        click.echo(
5✔
2752
                            click.style("[", fg="cyan")
2753
                            + click.style(str(timestamp), bold=True)
2754
                            + click.style("]", fg="cyan")
2755
                            + self.colorize_log_entry(message.strip()),
2756
                            color=force_colorize,
2757
                        )
2758
                else:
2759
                    click.echo(
5✔
2760
                        click.style("[", fg="cyan")
2761
                        + click.style(str(timestamp), bold=True)
2762
                        + click.style("]", fg="cyan")
2763
                        + self.colorize_log_entry(message.strip()),
2764
                        color=force_colorize,
2765
                    )
2766

2767
    def is_http_log_entry(self, string):
5✔
2768
        """
2769
        Determines if a log entry is an HTTP-formatted log string or not.
2770
        """
2771
        # Debug event filter
2772
        if "Zappa Event" in string:
5✔
2773
            return False
5✔
2774

2775
        # IP address filter
2776
        for token in string.replace("\t", " ").split(" "):
5✔
2777
            try:
5✔
2778
                if token.count(".") == 3 and token.replace(".", "").isnumeric():
5✔
2779
                    return True
5✔
2780
            except Exception:  # pragma: no cover
2781
                pass
2782

2783
        return False
5✔
2784

2785
    def get_project_name(self):
5✔
2786
        return slugify.slugify(os.getcwd().split(os.sep)[-1])[:15]
5✔
2787

2788
    def colorize_log_entry(self, string):
5✔
2789
        """
2790
        Apply various heuristics to return a colorized version of a string.
2791
        If these fail, simply return the string in plaintext.
2792
        """
2793

2794
        final_string = string
5✔
2795
        try:
5✔
2796
            # First, do stuff in square brackets
2797
            inside_squares = re.findall(r"\[([^]]*)\]", string)
5✔
2798
            for token in inside_squares:
5✔
2799
                if token in ["CRITICAL", "ERROR", "WARNING", "DEBUG", "INFO", "NOTSET"]:
5✔
2800
                    final_string = final_string.replace(
5✔
2801
                        "[" + token + "]",
2802
                        click.style("[", fg="cyan") + click.style(token, fg="cyan", bold=True) + click.style("]", fg="cyan"),
2803
                    )
2804
                else:
2805
                    final_string = final_string.replace(
5✔
2806
                        "[" + token + "]",
2807
                        click.style("[", fg="cyan") + click.style(token, bold=True) + click.style("]", fg="cyan"),
2808
                    )
2809

2810
            # Then do quoted strings
2811
            quotes = re.findall(r'"[^"]*"', string)
5✔
2812
            for token in quotes:
5✔
2813
                final_string = final_string.replace(token, click.style(token, fg="yellow"))
5✔
2814

2815
            # And UUIDs
2816
            for token in final_string.replace("\t", " ").split(" "):
5✔
2817
                try:
5✔
2818
                    if token.count("-") == 4 and token.replace("-", "").isalnum():
5✔
2819
                        final_string = final_string.replace(token, click.style(token, fg="magenta"))
5✔
2820
                except Exception:  # pragma: no cover
2821
                    pass
2822

2823
                # And IP addresses
2824
                try:
5✔
2825
                    if token.count(".") == 3 and token.replace(".", "").isnumeric():
5✔
2826
                        final_string = final_string.replace(token, click.style(token, fg="red"))
5✔
2827
                except Exception:  # pragma: no cover
2828
                    pass
2829

2830
                # And status codes
2831
                try:
5✔
2832
                    if token in ["200"]:
5✔
2833
                        final_string = final_string.replace(token, click.style(token, fg="green"))
5✔
2834
                    if token in ["400", "401", "403", "404", "405", "500"]:
5✔
2835
                        final_string = final_string.replace(token, click.style(token, fg="red"))
5✔
2836
                except Exception:  # pragma: no cover
2837
                    pass
2838

2839
            # And Zappa Events
2840
            try:
5✔
2841
                if "Zappa Event:" in final_string:
5✔
2842
                    final_string = final_string.replace(
5✔
2843
                        "Zappa Event:",
2844
                        click.style("Zappa Event:", bold=True, fg="green"),
2845
                    )
2846
            except Exception:  # pragma: no cover
2847
                pass
2848

2849
            # And dates
2850
            for token in final_string.split("\t"):
5✔
2851
                try:
5✔
2852
                    final_string = final_string.replace(token, click.style(token, fg="green"))
5✔
2853
                except Exception:  # pragma: no cover
2854
                    pass
2855

2856
            final_string = final_string.replace("\t", " ").replace("   ", " ")
5✔
2857
            if final_string[0] != " ":
5✔
2858
                final_string = " " + final_string
5✔
2859
            return final_string
5✔
2860
        except Exception:  # pragma: no cover
2861
            return string
2862

2863
    def execute_prebuild_script(self):
5✔
2864
        """
2865
        Parse and execute the prebuild_script from the zappa_settings.
2866
        """
2867

2868
        (pb_mod_path, pb_func) = self.prebuild_script.rsplit(".", 1)
5✔
2869

2870
        try:  # Prefer prebuild script in working directory
5✔
2871
            if pb_mod_path.count(".") >= 1:  # Prebuild script func is nested in a folder
5✔
2872
                (mod_folder_path, mod_name) = pb_mod_path.rsplit(".", 1)
5✔
2873
                mod_folder_path_fragments = mod_folder_path.split(".")
5✔
2874
                working_dir = os.path.join(os.getcwd(), *mod_folder_path_fragments)
5✔
2875
            else:
2876
                mod_name = pb_mod_path
×
2877
                working_dir = os.getcwd()
×
2878

2879
            working_dir_importer = pkgutil.get_importer(working_dir)
5✔
2880
            module_ = working_dir_importer.find_spec(mod_name).loader.load_module(mod_name)
5✔
2881

2882
        except (ImportError, AttributeError):
×
2883
            try:  # Prebuild func might be in virtualenv
×
2884
                module_ = importlib.import_module(pb_mod_path)
×
2885
            except ImportError:  # pragma: no cover
2886
                raise ClickException(
2887
                    click.style("Failed ", fg="red")
2888
                    + "to "
2889
                    + click.style("import prebuild script ", bold=True)
2890
                    + 'module: "{pb_mod_path}"'.format(pb_mod_path=click.style(pb_mod_path, bold=True))
2891
                )
2892

2893
        if not hasattr(module_, pb_func):  # pragma: no cover
2894
            raise ClickException(
2895
                click.style("Failed ", fg="red")
2896
                + "to "
2897
                + click.style("find prebuild script ", bold=True)
2898
                + 'function: "{pb_func}" '.format(pb_func=click.style(pb_func, bold=True))
2899
                + 'in module "{pb_mod_path}"'.format(pb_mod_path=pb_mod_path)
2900
            )
2901

2902
        prebuild_function = getattr(module_, pb_func)
5✔
2903
        prebuild_function()  # Call the function
5✔
2904

2905
    def collision_warning(self, item):
5✔
2906
        """
2907
        Given a string, print a warning if this could
2908
        collide with a Zappa core package module.
2909
        Use for app functions and events.
2910
        """
2911

2912
        namespace_collisions = [
5✔
2913
            "zappa.",
2914
            "wsgi.",
2915
            "middleware.",
2916
            "handler.",
2917
            "util.",
2918
            "letsencrypt.",
2919
            "cli.",
2920
        ]
2921
        for namespace_collision in namespace_collisions:
5✔
2922
            if item.startswith(namespace_collision):
5✔
2923
                click.echo(
5✔
2924
                    click.style("Warning!", fg="red", bold=True)
2925
                    + " You may have a namespace collision between "
2926
                    + click.style(item, bold=True)
2927
                    + " and "
2928
                    + click.style(namespace_collision, bold=True)
2929
                    + "! You may want to rename that file."
2930
                )
2931

2932
    def deploy_api_gateway(self, api_id):
5✔
2933
        cache_cluster_enabled = self.stage_config.get("cache_cluster_enabled", False)
5✔
2934
        cache_cluster_size = str(self.stage_config.get("cache_cluster_size", 0.5))
5✔
2935
        endpoint_url = self.zappa.deploy_api_gateway(
5✔
2936
            api_id=api_id,
2937
            stage_name=self.api_stage,
2938
            cache_cluster_enabled=cache_cluster_enabled,
2939
            cache_cluster_size=cache_cluster_size,
2940
            cloudwatch_log_level=self.stage_config.get("cloudwatch_log_level", "OFF"),
2941
            cloudwatch_data_trace=self.stage_config.get("cloudwatch_data_trace", False),
2942
            cloudwatch_metrics_enabled=self.stage_config.get("cloudwatch_metrics_enabled", False),
2943
            cache_cluster_ttl=self.stage_config.get("cache_cluster_ttl", 300),
2944
            cache_cluster_encrypted=self.stage_config.get("cache_cluster_encrypted", False),
2945
        )
2946
        return endpoint_url
5✔
2947

2948
    def check_venv(self):
5✔
2949
        """Ensure we're inside a virtualenv."""
2950
        if self.vargs and self.vargs.get("no_venv"):
5✔
2951
            return
×
2952
        if self.zappa:
5✔
2953
            venv = self.zappa.get_current_venv()
5✔
2954
        else:
2955
            # Just for `init`, when we don't have settings yet.
2956
            venv = Zappa.get_current_venv()
5✔
2957
        if not venv:
5✔
2958
            raise ClickException(
×
2959
                click.style("Zappa", bold=True)
2960
                + " requires an "
2961
                + click.style("active virtual environment", bold=True, fg="red")
2962
                + "!\n"
2963
                + "Learn more about virtual environments here: "
2964
                + click.style(
2965
                    "http://docs.python-guide.org/en/latest/dev/virtualenvs/",
2966
                    bold=False,
2967
                    fg="cyan",
2968
                )
2969
            )
2970

2971
    def silence(self):
5✔
2972
        """
2973
        Route all stdout to null.
2974
        """
2975

2976
        sys.stdout = open(os.devnull, "w")
×
2977
        sys.stderr = open(os.devnull, "w")
×
2978

2979
    def touch_endpoint(self, endpoint_url):
5✔
2980
        """
2981
        Test the deployed endpoint with a GET request.
2982
        """
2983

2984
        # Private APIGW endpoints most likely can't be reached by a deployer
2985
        # unless they're connected to the VPC by VPN. Instead of trying
2986
        # connect to the service, print a warning and let the user know
2987
        # to check it manually.
2988
        # See: https://github.com/Miserlou/Zappa/pull/1719#issuecomment-471341565
2989
        if "PRIVATE" in self.stage_config.get("endpoint_configuration", []):
×
2990
            print(
×
2991
                click.style("Warning!", fg="yellow", bold=True) + " Since you're deploying a private API Gateway endpoint,"
2992
                " Zappa cannot determine if your function is returning "
2993
                " a correct status code. You should check your API's response"
2994
                " manually before considering this deployment complete."
2995
            )
2996
            return
×
2997

2998
        touch_path = self.stage_config.get("touch_path", "/")
×
2999
        req = requests.get(endpoint_url + touch_path)
×
3000

3001
        # Sometimes on really large packages, it can take 60-90 secs to be
3002
        # ready and requests will return 504 status_code until ready.
3003
        # So, if we get a 504 status code, rerun the request up to 4 times or
3004
        # until we don't get a 504 error
3005
        if req.status_code == 504:
×
3006
            i = 0
×
3007
            status_code = 504
×
3008
            while status_code == 504 and i <= 4:
×
3009
                req = requests.get(endpoint_url + touch_path)
×
3010
                status_code = req.status_code
×
3011
                i += 1
×
3012

3013
        if req.status_code >= 500:
×
3014
            raise ClickException(
×
3015
                click.style("Warning!", fg="red", bold=True)
3016
                + " Status check on the deployed lambda failed."
3017
                + " A GET request to '"
3018
                + touch_path
3019
                + "' yielded a "
3020
                + click.style(str(req.status_code), fg="red", bold=True)
3021
                + " response code."
3022
            )
3023

3024

3025
####################################################################
3026
# Main
3027
####################################################################
3028

3029

3030
def shamelessly_promote():
5✔
3031
    """
3032
    Shamelessly promote our little community.
3033
    """
3034

3035
    click.echo(
5✔
3036
        "Need "
3037
        + click.style("help", fg="green", bold=True)
3038
        + "? Found a "
3039
        + click.style("bug", fg="green", bold=True)
3040
        + "? Let us "
3041
        + click.style("know", fg="green", bold=True)
3042
        + "! :D"
3043
    )
3044
    click.echo(
5✔
3045
        "File bug reports on "
3046
        + click.style("GitHub", bold=True)
3047
        + " here: "
3048
        + click.style("https://github.com/Zappa/Zappa", fg="cyan", bold=True)
3049
    )
3050
    click.echo(
5✔
3051
        "And join our "
3052
        + click.style("Slack", bold=True)
3053
        + " channel here: "
3054
        + click.style("https://zappateam.slack.com", fg="cyan", bold=True)
3055
    )
3056
    click.echo("Love!,")
5✔
3057
    click.echo(" ~ Team " + click.style("Zappa", bold=True) + "!")
5✔
3058

3059

3060
def disable_click_colors():
5✔
3061
    """
3062
    Set a Click context where colors are disabled. Creates a throwaway Command
3063
    to play nicely with the Context constructor.
3064
    The intended side-effect here is that click.echo() checks this context and will
3065
    suppress colors.
3066
    https://github.com/pallets/click/blob/e1aa43a3/click/globals.py#L39
3067
    """
3068

3069
    ctx = Context(Command("AllYourBaseAreBelongToUs"))
5✔
3070
    ctx.color = False
5✔
3071
    push_context(ctx)
5✔
3072

3073

3074
def handle():  # pragma: no cover
3075
    """
3076
    Main program execution handler.
3077
    """
3078

3079
    try:
3080
        cli = ZappaCLI()
3081
        sys.exit(cli.handle())
3082
    except SystemExit as e:  # pragma: no cover
3083
        cli.on_exit()
3084
        sys.exit(e.code)
3085

3086
    except KeyboardInterrupt:  # pragma: no cover
3087
        cli.on_exit()
3088
        sys.exit(130)
3089
    except Exception:
3090
        cli.on_exit()
3091

3092
        click.echo("Oh no! An " + click.style("error occurred", fg="red", bold=True) + "! :(")
3093
        click.echo("\n==============\n")
3094
        import traceback
3095

3096
        traceback.print_exc()
3097
        click.echo("\n==============\n")
3098
        shamelessly_promote()
3099

3100
        sys.exit(-1)
3101

3102

3103
if __name__ == "__main__":  # pragma: no cover
3104
    handle()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc