• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Twingate / kubernetes-operator / 6593582871

20 Oct 2023 11:13PM UTC coverage: 97.941%. First build
6593582871

Pull #1

github-actions

web-flow
Merge 5142e8044 into 50e4a8880
Pull Request #1: feat: Initial operator

67 of 71 branches covered (0.0%)

Branch coverage included in aggregate %.

366 of 366 new or added lines in 12 files covered. (100.0%)

361 of 366 relevant lines covered (98.63%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.31
/app/api/client.py
1
import logging
1✔
2
from typing import Any
1✔
3

4
import requests
1✔
5
from gql import Client
1✔
6
from gql.transport.exceptions import TransportAlreadyConnected
1✔
7
from gql.transport.requests import RequestsHTTPTransport
1✔
8
from graphql import DocumentNode
1✔
9
from requests.adapters import HTTPAdapter, Retry
1✔
10

11
from app.api.client_resources import TwingateResourceAPIs
1✔
12
from app.api.client_resources_access import TwingateResourceAccessAPIs
1✔
13
from app.settings import TwingateOperatorSettings, get_version
1✔
14

15
log = logging.getLogger(__name__)
1✔
16

17

18
class TwingateRety(Retry):
1✔
19
    """Custom retry object that retries on 429 errors"""
20

21
    # ruff: noqa: FBT002
22
    # (ignoring this here becuase this is not our function so
23
    # we can't change the signature)
24
    def is_retry(self, method, status_code, has_retry_after=False):
1✔
25
        return status_code == 429 or super().is_retry(
1✔
26
            method, status_code, has_retry_after
27
        )
28

29

30
class TwingateRequestsHTTPTransport(RequestsHTTPTransport):
1✔
31
    def __init__(self, twingate_api_key: str, *args, **kwargs):
1✔
32
        headers = kwargs.pop("headers", {})
1✔
33
        headers.update(
1✔
34
            {
35
                "User-Agent": f"Twingate-Operator/{get_version()}",
36
                "X-API-KEY": twingate_api_key,
37
            }
38
        )
39

40
        kwargs["headers"] = headers
1✔
41
        kwargs["retries"] = 10
1✔
42
        super().__init__(*args, **kwargs)
1✔
43

44
    def connect(self):
1✔
45
        if self.session:
1!
46
            raise TransportAlreadyConnected("Transport is already connected")
×
47

48
        # Creating a session that can later be re-use to configure custom mechanisms
49
        self.session = requests.Session()
1✔
50

51
        # If we specified some retries, we provide a predefined retry-logic
52
        if self.retries > 0:
1!
53
            adapter = HTTPAdapter(
1✔
54
                max_retries=TwingateRety(
55
                    total=self.retries,
56
                    backoff_factor=0.1,
57
                    status_forcelist=[500, 502, 503, 504],
58
                    allowed_methods=None,
59
                )
60
            )
61
            for prefix in "http://", "https://":
1✔
62
                self.session.mount(prefix, adapter)
1✔
63

64

65
class GraphQLMutationError(Exception):
1✔
66
    def __init__(self, mutation_name: str, error: str):
1✔
67
        self.mutation_name = mutation_name
1✔
68
        self.error = error
1✔
69
        self.message = f"{mutation_name} mutation failed."
1✔
70
        super().__init__(self.message)
1✔
71

72

73
class TwingateAPIClient(TwingateResourceAPIs, TwingateResourceAccessAPIs):
1✔
74
    def __init__(
1✔
75
        self,
76
        settings: TwingateOperatorSettings,
77
        *,
78
        fetch_schema_from_transport: bool = False,
79
    ):
80
        self.settings = settings
1✔
81
        self.client = self._get_client(
1✔
82
            fetch_schema_from_transport=fetch_schema_from_transport
83
        )
84

85
    def _get_client(self, *, fetch_schema_from_transport: bool = False) -> Client:
1✔
86
        network = self.settings.network
1✔
87
        host = self.settings.host
1✔
88
        transport = TwingateRequestsHTTPTransport(
1✔
89
            self.settings.api_key, url=f"https://{network}.{host}/api/graphql/"
90
        )
91
        return Client(
1✔
92
            transport=transport, fetch_schema_from_transport=fetch_schema_from_transport
93
        )
94

95
    def execute_gql(
1✔
96
        self, document: DocumentNode, variable_values: dict[str, Any] | None = None
97
    ):
98
        logging.info("Calling %s with %s", document, variable_values)
1✔
99
        result = self.client.execute(document, variable_values=variable_values)
1✔
100
        logging.info("Result: %s", result)
1✔
101
        return result
1✔
102

103
    def execute_mutation(
1✔
104
        self,
105
        name: str,
106
        document: DocumentNode,
107
        variable_values: dict[str, Any] | None = None,
108
    ):
109
        result = self.execute_gql(document, variable_values=variable_values)
1✔
110
        data = result[name]
1✔
111
        if not data["ok"]:
1✔
112
            raise GraphQLMutationError(f"{name} mutation failed.", data.get("error"))
1✔
113

114
        return data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc