• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lorinkoz / django-pgschemas / 6865619298

14 Nov 2023 03:14PM UTC coverage: 37.347% (+0.6%) from 36.705%
6865619298

push

github

lorinkoz
Fixed autodocs

549 of 1470 relevant lines covered (37.35%)

4.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/django_pgschemas/routing/middleware.py
1
import re
×
2
from typing import Callable, TypeAlias
×
3

4
from asgiref.sync import iscoroutinefunction, sync_to_async
×
5
from django.conf import settings
×
6
from django.http import Http404, HttpRequest, HttpResponse
×
7
from django.shortcuts import redirect
×
8
from django.urls import clear_url_caches, set_urlconf
×
9
from django.utils.decorators import sync_and_async_middleware
×
10

11
from django_pgschemas.routing.info import DomainInfo
×
12
from django_pgschemas.routing.urlresolvers import (
×
13
    get_urlconf_from_schema,
14
)
15
from django_pgschemas.schema import Schema, activate, activate_public
×
16
from django_pgschemas.utils import (
×
17
    get_domain_model,
18
)
19

20

21
def remove_www(path: str) -> str:
×
22
    if path.startswith("www."):
×
23
        return path[4:]
×
24
    return path
×
25

26

27
def strip_tenant_from_path_factory(prefix):
×
28
    def strip_tenant_from_path(path):
×
29
        return re.sub(r"^/{}/".format(prefix), "/", path)
×
30

31
    return strip_tenant_from_path
×
32

33

34
ResponseHandler: TypeAlias = Callable[[HttpRequest], HttpResponse]
×
35

36

37
def route_domain(request: HttpRequest) -> HttpResponse | None:
×
38
    hostname = remove_www(request.get_host().split(":")[0])
×
39

40
    activate_public()
×
41
    tenant = None
×
42

43
    # Checking for static tenants
44
    for schema, data in settings.TENANTS.items():
×
45
        if schema in ["public", "default"]:
×
46
            continue
×
47
        if hostname in data["DOMAINS"]:
×
48
            tenant = Schema.create(
×
49
                schema_name=schema,
50
                routing=DomainInfo(domain=hostname),
51
            )
52
            break
×
53

54
    # Checking for dynamic tenants
55
    else:
56
        ActualDomainModel = get_domain_model()
×
57

58
        prefix = request.path.split("/")[1]
×
59
        domain = None
×
60

61
        if ActualDomainModel is not None:
×
62
            try:
×
63
                domain = ActualDomainModel.objects.select_related("tenant").get(
×
64
                    domain=hostname, folder=prefix
65
                )
66
            except ActualDomainModel.DoesNotExist:
×
67
                try:
×
68
                    domain = ActualDomainModel.objects.select_related("tenant").get(
×
69
                        domain=hostname, folder=""
70
                    )
71
                except ActualDomainModel.DoesNotExist:
×
72
                    pass
×
73

74
        if domain is not None:
×
75
            tenant = domain.tenant
×
76
            tenant.routing = DomainInfo(domain=hostname)
×
77
            request.strip_tenant_from_path = lambda x: x
×
78

79
            if prefix and domain.folder == prefix:
×
80
                tenant.routing = DomainInfo(domain=hostname, folder=prefix)
×
81
                request.strip_tenant_from_path = strip_tenant_from_path_factory(prefix)
×
82
                clear_url_caches()  # Required to remove previous tenant prefix from cache (#8)
×
83

84
            if domain.redirect_to_primary:
×
85
                primary_domain = tenant.domains.get(is_primary=True)
×
86
                path = request.strip_tenant_from_path(request.path)
×
87
                return redirect(primary_domain.absolute_url(path), permanent=True)
×
88

89
    # Checking fallback domains
90
    if not tenant:
×
91
        for schema, data in settings.TENANTS.items():
×
92
            if schema in ["public", "default"]:
×
93
                continue
×
94
            if hostname in data.get("FALLBACK_DOMAINS", []):
×
95
                tenant = Schema.create(
×
96
                    schema_name=schema,
97
                    routing=DomainInfo(domain=hostname),
98
                )
99
                break
×
100

101
    # No tenant found from domain / folder
102
    if not tenant:
×
103
        raise Http404("No tenant for hostname '%s'" % hostname)
×
104

105
    urlconf = get_urlconf_from_schema(tenant)
×
106

107
    request.tenant = tenant
×
108
    request.urlconf = urlconf
×
109
    set_urlconf(urlconf)
×
110

111
    activate(tenant)
×
112

113

114
def route_session(request: HttpRequest) -> HttpResponse | None:
×
115
    return None
×
116

117

118
def route_headers(request: HttpRequest) -> HttpResponse | None:
×
119
    return None
×
120

121

122
def middleware_factory(
×
123
    handler: Callable[[HttpRequest], HttpResponse | None]
124
) -> Callable[[ResponseHandler], ResponseHandler]:
125
    @sync_and_async_middleware
×
126
    def middleware(get_response: ResponseHandler) -> ResponseHandler:
×
127
        if iscoroutinefunction(get_response):
×
128
            async_base_middleware = sync_to_async(handler)
×
129

130
            async def sync_middleware(request: HttpRequest) -> HttpResponse | None:
×
131
                if response := await async_base_middleware(request):
×
132
                    return response
×
133

134
                return await get_response(request)
×
135

136
            return sync_middleware
×
137

138
        else:
139

140
            def async_middleware(request: HttpRequest) -> HttpResponse | None:
×
141
                if response := handler(request):
×
142
                    return response
×
143

144
                return get_response(request)
×
145

146
            return async_middleware
×
147

148
    return middleware
×
149

150

151
DomainRoutingMiddleware = middleware_factory(route_domain)
×
152
SessionRoutingMiddleware = middleware_factory(route_session)
×
153
HeadersRoutingMiddleware = middleware_factory(route_headers)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc