• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lorinkoz / django-pgschemas / 6698311078

30 Oct 2023 08:23PM UTC coverage: 93.617%. Remained the same
6698311078

Pull #192

github

web-flow
Merge 2f3cf7f8d into 4be7e4d2b
Pull Request #192: Bump ruff from 0.1.2 to 0.1.3

1188 of 1269 relevant lines covered (93.62%)

5.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.08
/django_pgschemas/management/commands/runschema.py
1
import argparse
6✔
2
import sys
6✔
3

4
from django.core.management import get_commands, load_command_class
6✔
5
from django.core.management.base import BaseCommand, CommandError, SystemCheckError
6✔
6

7
from . import WrappedSchemaOption
6✔
8

9

10
class Command(WrappedSchemaOption, BaseCommand):
6✔
11
    help = "Wrapper around Django commands for use with an individual schema"
6✔
12

13
    def add_arguments(self, parser):
6✔
14
        super().add_arguments(parser)
×
15
        parser.add_argument("command_name", help="The command name you want to run")
×
16

17
    def get_command_from_arg(self, arg):
6✔
18
        *chunks, command = arg.split(".")
6✔
19
        path = ".".join(chunks)
6✔
20
        if not path:
6✔
21
            path = get_commands().get(command)
×
22
        try:
6✔
23
            cmd = load_command_class(path, command)
6✔
24
        except Exception:
×
25
            raise CommandError("Unknown command: %s" % arg)
×
26
        if isinstance(cmd, WrappedSchemaOption):
6✔
27
            raise CommandError("Command '%s' cannot be used in runschema" % arg)
×
28
        return cmd
6✔
29

30
    def run_from_argv(self, argv):  # pragma: no cover
31
        """
32
        Changes the option_list to use the options from the wrapped command.
33
        Adds schema parameter to specify which schema will be used when
34
        executing the wrapped command.
35
        """
36
        try:
37
            # load the command object.
38
            if len(argv) <= 2:
39
                raise CommandError("No command to run")
40
            target_class = self.get_command_from_arg(argv[2])
41
            # Ugly, but works. Delete command_name from the argv, parse the schemas manually
42
            # and forward the rest of the arguments to the actual command being wrapped.
43
            del argv[1]
44
            schema_parser = argparse.ArgumentParser()
45
            super().add_arguments(schema_parser)
46
            schema_ns, args = schema_parser.parse_known_args(argv)
47

48
            schemas = self.get_schemas_from_options(
49
                schemas=schema_ns.schemas,
50
                all_schemas=schema_ns.all_schemas,
51
                static_schemas=schema_ns.static_schemas,
52
                dynamic_schemas=schema_ns.dynamic_schemas,
53
                tenant_schemas=schema_ns.tenant_schemas,
54
            )
55
            executor = self.get_executor_from_options(parallel=schema_ns.parallel)
56
        except Exception as e:
57
            if not isinstance(e, CommandError):
58
                raise
59
            # SystemCheckError takes care of its own formatting.
60
            if isinstance(e, SystemCheckError):
61
                self.stderr.write(str(e), lambda x: x)
62
            else:
63
                self.stderr.write("%s: %s" % (e.__class__.__name__, e))
64
            sys.exit(1)
65

66
        executor(schemas, target_class, "special:run_from_argv", args)
67

68
    def handle(self, *args, **options):
6✔
69
        target = self.get_command_from_arg(options.pop("command_name"))
6✔
70
        schemas = self.get_schemas_from_options(**options)
6✔
71
        executor = self.get_executor_from_options(**options)
6✔
72
        options.pop("schemas")
6✔
73
        options.pop("excluded_schemas")
6✔
74
        options.pop("all_schemas")
6✔
75
        options.pop("static_schemas")
6✔
76
        options.pop("dynamic_schemas")
6✔
77
        options.pop("tenant_schemas")
6✔
78
        options.pop("parallel")
6✔
79
        options.pop("skip_schema_creation")
6✔
80
        if self.allow_interactive:
6✔
81
            options.pop("interactive")
×
82
        executor(schemas, target, "special:call_command", args, options)
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc