• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lorinkoz / django-pgschemas / 27f50eb8b06010e78666bbd7ef9e4428ec6eb3d6-PR-34

8 May 2020 - 21:55 coverage: 91.923% (-0.2%) from 92.16%
27f50eb8b06010e78666bbd7ef9e4428ec6eb3d6-PR-34

Pull #34

github-actions

GitHub
Merge b61ee16d4 into 63c7fb0bc
Pull Request #34: Database connections in subprocess

2 of 5 new or added lines in 1 file covered. (40.0%)

1104 of 1201 relevant lines covered (91.92%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.55
/django_pgschemas/management/commands/_executors.py
1
import functools
1×
2
import multiprocessing
1×
3

4
from django.conf import settings
1×
5
from django.core.management import call_command
1×
6
from django.core.management.base import BaseCommand, OutputWrapper, CommandError
1×
7
from django.db import connection, transaction, connections
1×
8

9

10
def run_on_schema(
1×
11
    schema_name,
12
    executor_codename,
13
    command,
14
    function_name=None,
15
    args=[],
16
    kwargs={},
17
    pass_schema_in_kwargs=False,
18
    fork_db=False,
19
):
20
    if not isinstance(command, BaseCommand):
1×
21
        # Parallel executor needs to pass command 'type' instead of 'instance'
22
        # Therefore, no customizations for the command can be done, nor using custom stdout, stderr
23
        command = command()
!
24

25
    command.stdout = kwargs.pop("stdout", command.stdout)
1×
26
    if not isinstance(command.stdout, OutputWrapper):
1×
27
        command.stdout = OutputWrapper(command.stdout)
1×
28

29
    command.stderr = kwargs.pop("stderr", command.stderr)
1×
30
    if not isinstance(command.stderr, OutputWrapper):
1×
31
        command.stderr = OutputWrapper(command.stderr)
!
32

33
    # Since we are prepending every output with the schema_name and executor, we need to determine
34
    # whether we need to do so based on the last ending used to write. If the last write didn't end
35
    # in '\n' then we don't do the prefixing in order to keep the output looking good.
36
    class StyleFunc(object):
1×
37
        last_message = None
1×
38

39
        def __call__(self, message):
1×
40
            last_message = self.last_message
!
41
            self.last_message = message
!
42
            if last_message is None or last_message.endswith("\n"):
!
43
                return "[%s:%s] %s" % (
!
44
                    command.style.NOTICE(executor_codename),
45
                    command.style.NOTICE(schema_name),
46
                    message,
47
                )
48
            return message
!
49

50
    command.stdout.style_func = StyleFunc()
1×
51
    command.stderr.style_func = StyleFunc()
1×
52

53
    if fork_db:
1×
NEW
54
        connections.close_all()
!
55
    connection.set_schema_to(schema_name)
1×
56

57
    if pass_schema_in_kwargs:
1×
58
        kwargs.update({"schema_name": schema_name})
1×
59

60
    if function_name == "special:call_command":
1×
61
        call_command(command, *args, **kwargs)
1×
62
    elif function_name == "special:run_from_argv":
1×
63
        command.run_from_argv(args)
!
64
    else:
65
        getattr(command, function_name)(*args, **kwargs)
1×
66

67
    if fork_db:
1×
NEW
68
        transaction.commit()
!
NEW
69
        connection.close()
!
70

71
    return schema_name
1×
72

73

74
def sequential(schemas, command, function_name, args=[], kwargs={}, pass_schema_in_kwargs=False):
1×
75
    runner = functools.partial(
1×
76
        run_on_schema,
77
        executor_codename="sequential",
78
        command=command,
79
        function_name=function_name,
80
        args=args,
81
        kwargs=kwargs,
82
        pass_schema_in_kwargs=pass_schema_in_kwargs,
83
        fork_db=False,
84
    )
85
    for schema in schemas:
1×
86
        runner(schema)
1×
87
    return schemas
1×
88

89

90
def parallel(schemas, command, function_name, args=[], kwargs={}, pass_schema_in_kwargs=False):
1×
91
    processes = getattr(settings, "PGSCHEMAS_PARALLEL_MAX_PROCESSES", None)
1×
92
    pool = multiprocessing.Pool(processes=processes)
1×
93
    runner = functools.partial(
1×
94
        run_on_schema,
95
        executor_codename="parallel",
96
        command=type(command),  # Can't pass streams to children processes
97
        function_name=function_name,
98
        args=args,
99
        kwargs=kwargs,
100
        pass_schema_in_kwargs=pass_schema_in_kwargs,
101
        fork_db=True,
102
    )
103
    return pool.map(runner, schemas)
1×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2023 Coveralls, Inc