• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Twingate / kubernetes-operator / 15149455378

20 May 2025 10:53PM UTC coverage: 96.576%. First build
15149455378

Pull #648

github

web-flow
Merge a8177b615 into 9d76ffabf
Pull Request #648: feat: Support for Twingate Kubernetes Access Gateway

189 of 214 branches covered (88.32%)

Branch coverage included in aggregate %.

119 of 121 new or added lines in 5 files covered. (98.35%)

1193 of 1217 relevant lines covered (98.03%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.04
/app/api/client_resources.py
1
import base64
1✔
2
from datetime import datetime
1✔
3
from typing import Any, Literal
1✔
4

5
from gql import gql
1✔
6
from gql.transport.exceptions import TransportQueryError
1✔
7
from pydantic import BaseModel, ConfigDict, Field
1✔
8
from pydantic.alias_generators import to_camel
1✔
9

10
from app.api.exceptions import GraphQLMutationError
1✔
11
from app.api.protocol import TwingateClientProtocol
1✔
12
from app.crds import ProtocolPolicy, ResourceProxy, ResourceSpec, ResourceType
1✔
13

14

15
class ResourceAddress(BaseModel):
1✔
16
    model_config = ConfigDict(arbitrary_types_allowed=True)
1✔
17

18
    type: Literal["IP", "DNS"]
1✔
19
    value: str
1✔
20

21

22
class ResourceRemoteNetwork(BaseModel):
1✔
23
    id: str
1✔
24

25

26
class ResourceSecurityPolicy(BaseModel):
1✔
27
    id: str
1✔
28

29

30
class ProtocoRange(BaseModel):
1✔
31
    model_config = ConfigDict(
1✔
32
        frozen=True, populate_by_name=True, alias_generator=to_camel
33
    )
34

35
    start: int = Field(ge=0, le=65535)
1✔
36
    end: int = Field(ge=0, le=65535)
1✔
37

38

39
class ResourceProtocol(BaseModel):
1✔
40
    model_config = ConfigDict(
1✔
41
        frozen=True, populate_by_name=True, alias_generator=to_camel
42
    )
43

44
    policy: ProtocolPolicy = ProtocolPolicy.ALLOW_ALL
1✔
45
    ports: list[ProtocoRange] = Field(default_factory=list)
1✔
46

47

48
class ResourceProtocols(BaseModel):
1✔
49
    model_config = ConfigDict(
1✔
50
        frozen=True, populate_by_name=True, alias_generator=to_camel
51
    )
52

53
    allow_icmp: bool = True
1✔
54
    tcp: ResourceProtocol = Field(default_factory=ResourceProtocol)
1✔
55
    udp: ResourceProtocol = Field(default_factory=ResourceProtocol)
1✔
56

57

58
class Tag(BaseModel):
1✔
59
    model_config = ConfigDict(
1✔
60
        frozen=True, populate_by_name=True, alias_generator=to_camel
61
    )
62

63
    key: str
1✔
64
    value: str
1✔
65

66

67
class BaseResource(BaseModel):
1✔
68
    model_config = ConfigDict(populate_by_name=True, alias_generator=to_camel)
1✔
69

70
    id: str
1✔
71
    name: str
1✔
72
    created_at: datetime
1✔
73
    updated_at: datetime
1✔
74
    address: ResourceAddress
1✔
75
    is_visible: bool
1✔
76
    remote_network: ResourceRemoteNetwork
1✔
77
    alias: str | None = None
1✔
78
    security_policy: ResourceSecurityPolicy | None = Field(
1✔
79
        alias="securityPolicy", default=None
80
    )
81
    protocols: ResourceProtocols = Field(default_factory=ResourceProtocols)
1✔
82
    tags: list[Tag]
1✔
83

84
    @staticmethod
1✔
85
    def get_graphql_fragment():
1✔
86
        return """
1✔
87
            fragment BaseResourceFields on Resource {
88
                id
89
                name
90
                createdAt
91
                updatedAt
92
                address {
93
                  type
94
                  value
95
                }
96
                alias
97
                isVisible
98
                remoteNetwork { id }
99
                securityPolicy { id }
100
                protocols {
101
                    allowIcmp
102
                    tcp {
103
                        policy
104
                        ports {
105
                            start
106
                            end
107
                        }
108
                    }
109
                    udp {
110
                        policy
111
                        ports {
112
                            start
113
                            end
114
                        }
115
                    }
116
                }
117
                tags {
118
                    key
119
                    value
120
                }
121
            }
122
        """
123

124
    def is_matching_spec(self, crd: ResourceSpec) -> bool:
1✔
125
        self_protocols = self.protocols.model_dump() if self.protocols else None
1✔
126
        crd_protocols = crd.protocols.model_dump() if crd.protocols else None
1✔
127

128
        return (
1✔
129
            self.name == crd.name
130
            and self.address.value == crd.address
131
            and self.alias == crd.alias
132
            and self.is_visible == crd.is_visible
133
            and self_protocols == crd_protocols
134
            and self.remote_network.id == crd.remote_network_id
135
            and (self.security_policy and self.security_policy.id)
136
            == crd.security_policy_id
137
        )
138

139
    def to_spec_dict(self) -> dict[str, Any]:
1✔
140
        data = self.model_dump(
1✔
141
            include={
142
                "id",
143
                "name",
144
                "alias",
145
                "is_visible",
146
                "protocols",
147
                "tags",
148
            }
149
        )
150
        data["address"] = self.address.value
1✔
151
        data["remote_network_id"] = self.remote_network.id
1✔
152
        data["security_policy_id"] = (
1✔
153
            self.security_policy.id if self.security_policy else None
154
        )
155
        return data
1✔
156

157
    def is_matching_labels(self, crd_labels: dict[str, str]) -> bool:
1✔
158
        return crd_labels == self.to_metadata_labels()
1✔
159

160
    def to_metadata_labels(self) -> dict[str, str]:
1✔
161
        return {tag.key: tag.value for tag in self.tags}
1✔
162

163

164
class NetworkResource(BaseResource):
1✔
165
    is_browser_shortcut_enabled: bool
1✔
166

167
    @staticmethod
1✔
168
    def get_graphql_fragment():
1✔
169
        return (
1✔
170
            BaseResource.get_graphql_fragment()
171
            + """
172
            fragment NetworkResourceFields on NetworkResource {
173
                ...BaseResourceFields
174
                isBrowserShortcutEnabled
175
            }
176
            """
177
        )
178

179
    def is_matching_spec(self, crd: ResourceSpec) -> bool:
1✔
180
        return (
1✔
181
            super().is_matching_spec(crd)
182
            and self.is_browser_shortcut_enabled == crd.is_browser_shortcut_enabled
183
        )
184

185
    def to_spec(self, **overrides: Any) -> ResourceSpec:
1✔
186
        data: dict[str, Any] = (
1✔
187
            {
188
                "type": ResourceType.NETWORK,
189
                "is_browser_shortcut_enabled": self.is_browser_shortcut_enabled,
190
            }
191
            | super().to_spec_dict()
192
            | overrides
193
        )
194

195
        return ResourceSpec(**data)
1✔
196

197

198
class KubernetesResource(BaseResource):
1✔
199
    proxy_address: str
1✔
200
    certificate_authority_cert: str
1✔
201

202
    @staticmethod
1✔
203
    def get_graphql_fragment():
1✔
204
        return (
1✔
205
            BaseResource.get_graphql_fragment()
206
            + """
207
            fragment KubernetesResourceFields on KubernetesResource {
208
                ...BaseResourceFields
209
                proxyAddress
210
                certificateAuthorityCert
211
            }
212
            """
213
        )
214

215
    def is_matching_spec(self, crd: ResourceSpec) -> bool:
1✔
NEW
216
        return (
×
217
            super().is_matching_spec(crd)
218
            and crd.proxy is not None
219
            and self.proxy_address == crd.proxy.address
220
            and self.certificate_authority_cert == crd.proxy.certificate_authority_cert
221
        )
222

223
    def to_spec(self, **overrides: Any) -> ResourceSpec:
1✔
224
        data: dict[str, Any] = (
1✔
225
            {
226
                "type": ResourceType.KUBERNETES,
227
                "proxy": ResourceProxy(
228
                    address=self.proxy_address,
229
                    certificate_authority_cert=base64.b64encode(
230
                        self.certificate_authority_cert.encode()
231
                    ).decode(),
232
                ),
233
            }
234
            | super().to_spec_dict()
235
            | overrides
236
        )
237
        return ResourceSpec(**data)
1✔
238

239

240
# fmt:off
241

242
_NETWORK_RESOURCE_FRAGMENT = NetworkResource.get_graphql_fragment()
1✔
243
_KUBERNETES_RESOURCE_FRAGMENT = KubernetesResource.get_graphql_fragment()
1✔
244

245
QUERY_GET_RESOURCE = gql(BaseResource.get_graphql_fragment() + """
1✔
246
    query GetResource($id: ID!) {
247
        resource(id: $id) {
248
            __typename
249
            ...BaseResourceFields
250
            ... on NetworkResource {
251
                isBrowserShortcutEnabled
252
            }
253
            ... on KubernetesResource {
254
                proxyAddress
255
                certificateAuthorityCert
256
            }
257
        }
258
    }
259
"""
260
)
261

262
# region Resource Create Mutations
263

264
MUT_CREATE_RESOURCE = gql(_NETWORK_RESOURCE_FRAGMENT + """
1✔
265
    mutation CreateNetworkResource(
266
        $name: String!
267
        $address: String!
268
        $alias: String
269
        $isVisible: Boolean
270
        $isBrowserShortcutEnabled: Boolean
271
        $protocols: ProtocolsInput
272
        $remoteNetworkId: ID!
273
        $securityPolicyId: ID
274
        $tags: [TagInput!]
275
    ) {
276
        resourceCreate(
277
            name: $name
278
            address: $address
279
            alias: $alias
280
            isVisible: $isVisible
281
            isBrowserShortcutEnabled: $isBrowserShortcutEnabled
282
            protocols: $protocols
283
            remoteNetworkId: $remoteNetworkId
284
            securityPolicyId: $securityPolicyId
285
            tags: $tags
286
        ) {
287
            ok
288
            error
289
            entity {
290
              ...NetworkResourceFields
291
            }
292
        }
293
    }
294
"""
295
)
296
MUT_CREATE_KUBERNETES_RESOURCE = gql(_KUBERNETES_RESOURCE_FRAGMENT + """
1✔
297
    mutation CreateKubernetesResource(
298
        $name: String!
299
        $address: String!
300
        $alias: String
301
        $isVisible: Boolean
302
        $protocols: ProtocolsInput
303
        $remoteNetworkId: ID!
304
        $securityPolicyId: ID
305
        $tags: [TagInput!]
306
        $proxyAddress: String!
307
        $certificateAuthorityCert: String!
308
    ) {
309
        kubernetesResourceCreate(
310
            name: $name
311
            address: $address
312
            alias: $alias
313
            isVisible: $isVisible
314
            protocols: $protocols
315
            remoteNetworkId: $remoteNetworkId
316
            securityPolicyId: $securityPolicyId
317
            tags: $tags
318
            proxyAddress: $proxyAddress
319
            certificateAuthorityCert: $certificateAuthorityCert
320
        ) {
321
            ok
322
            error
323
            entity {
324
              ...KubernetesResourceFields
325
            }
326
        }
327
    }
328
"""
329
)
330

331
# endregion
332

333
# region Resource Update Mutations
334

335
MUT_UPDATE_NETWORK_RESOURCE = gql(_NETWORK_RESOURCE_FRAGMENT + """
1✔
336
    mutation UpdateNetworkResource(
337
        $id: ID!
338
        $name: String!
339
        $address: String!
340
        $alias: String
341
        $isVisible: Boolean
342
        $isBrowserShortcutEnabled: Boolean
343
        $protocols: ProtocolsInput
344
        $remoteNetworkId: ID!
345
        $securityPolicyId: ID
346
        $tags: [TagInput!]
347
    ) {
348
        resourceUpdate(
349
            id: $id,
350
            name: $name
351
            address: $address
352
            alias: $alias
353
            isVisible: $isVisible
354
            isBrowserShortcutEnabled: $isBrowserShortcutEnabled
355
            protocols: $protocols
356
            remoteNetworkId: $remoteNetworkId
357
            securityPolicyId: $securityPolicyId
358
            tags: $tags
359
        ) {
360
            ok
361
            error
362
            entity {
363
                ...NetworkResourceFields
364
            }
365
        }
366
    }
367
"""
368
)
369
MUT_UPDATE_KUBERNETES_RESOURCE = gql(_KUBERNETES_RESOURCE_FRAGMENT + """
1✔
370
    mutation UpdateKubernetesResource(
371
        $id: ID!
372
        $name: String!
373
        $address: String!
374
        $alias: String
375
        $isVisible: Boolean
376
        $protocols: ProtocolsInput
377
        $remoteNetworkId: ID!
378
        $securityPolicyId: ID
379
        $tags: [TagInput!]
380
        $proxyAddress: String
381
        $certificateAuthorityCert: String
382
    ) {
383
        kubernetesResourceUpdate(
384
            id: $id,
385
            name: $name
386
            address: $address
387
            alias: $alias
388
            isVisible: $isVisible
389
            protocols: $protocols
390
            remoteNetworkId: $remoteNetworkId
391
            securityPolicyId: $securityPolicyId
392
            tags: $tags
393
            proxyAddress: $proxyAddress
394
            certificateAuthorityCert: $certificateAuthorityCert
395
        ) {
396
            ok
397
            error
398
            entity {
399
                ...KubernetesResourceFields
400
            }
401
        }
402
    }
403
"""
404
)
405

406
# endregion
407

408

409

410
MUT_DELETE_RESOURCE = gql("""
1✔
411
    mutation DeleteResource($id: ID!) {
412
        resourceDelete(id: $id) {
413
            ok
414
            error
415
        }
416
    }
417
"""
418
)
419

420
# fmt:on
421

422

423
class TwingateResourceAPIs:
1✔
424
    def get_resource(
1✔
425
        self: TwingateClientProtocol, resource_id: str
426
    ) -> NetworkResource | KubernetesResource | None:
427
        try:
1✔
428
            result = self.execute_gql(
1✔
429
                QUERY_GET_RESOURCE, variable_values={"id": resource_id}
430
            )
431
            if not result["resource"]:
1!
NEW
432
                return None
×
433

434
            resource_type = result["resource"]["__typename"]
1✔
435
            match resource_type:
1✔
436
                case "NetworkResource":
1✔
437
                    return NetworkResource(**result["resource"])
1✔
438
                case "KubernetesResource":
1✔
439
                    return KubernetesResource(**result["resource"])
1✔
440
                case _:
1✔
441
                    raise ValueError(f"Invalid Resource Type: {resource_type}")
1✔
442
        except TransportQueryError:
1✔
443
            self.logger.exception("Failed to get resource")
1✔
444
            return None
1✔
445

446
    def resource_create(
1✔
447
        self: TwingateClientProtocol, resource_type: ResourceType, **graphql_arguments
448
    ) -> NetworkResource | KubernetesResource:
449
        if resource_type == ResourceType.KUBERNETES:
1✔
450
            return self.kubernetes_resource_create(**graphql_arguments)  # type: ignore[attr-defined]
1✔
451

452
        return self.network_resource_create(**graphql_arguments)  # type: ignore[attr-defined]
1✔
453

454
    def network_resource_create(
1✔
455
        self: TwingateClientProtocol,
456
        *,
457
        name: str,
458
        address: str,
459
        alias: str | None,
460
        is_visible: bool,
461
        is_browser_shortcut_enabled: bool,
462
        remote_network_id: str,
463
        security_policy_id: str | None,
464
        protocols: dict[str, Any],
465
        tags: list[dict[str, str]],
466
    ) -> NetworkResource:
467
        result = self.execute_mutation(
1✔
468
            "resourceCreate",
469
            MUT_CREATE_RESOURCE,
470
            variable_values={
471
                "name": name,
472
                "address": address,
473
                "alias": alias,
474
                "isVisible": is_visible,
475
                "isBrowserShortcutEnabled": is_browser_shortcut_enabled,
476
                "remoteNetworkId": remote_network_id,
477
                "securityPolicyId": security_policy_id,
478
                "protocols": protocols,
479
                "tags": tags,
480
            },
481
        )
482
        return NetworkResource(**result["entity"])
1✔
483

484
    def kubernetes_resource_create(
1✔
485
        self: TwingateClientProtocol,
486
        *,
487
        name: str,
488
        address: str,
489
        alias: str | None,
490
        is_visible: bool,
491
        remote_network_id: str,
492
        security_policy_id: str | None,
493
        protocols: dict[str, Any],
494
        tags: list[dict[str, str]],
495
        proxy_address: str,
496
        certificate_authority_cert: str,
497
    ) -> KubernetesResource:
498
        result = self.execute_mutation(
1✔
499
            "kubernetesResourceCreate",
500
            MUT_CREATE_KUBERNETES_RESOURCE,
501
            variable_values={
502
                "name": name,
503
                "address": address,
504
                "alias": alias,
505
                "isVisible": is_visible,
506
                "remoteNetworkId": remote_network_id,
507
                "securityPolicyId": security_policy_id,
508
                "protocols": protocols,
509
                "tags": tags,
510
                "proxyAddress": proxy_address,
511
                "certificateAuthorityCert": certificate_authority_cert,
512
            },
513
        )
514
        return KubernetesResource(**result["entity"])
1✔
515

516
    def resource_update(
1✔
517
        self: TwingateClientProtocol,
518
        id: str,
519
        resource_type: ResourceType,
520
        **graphql_arguments,
521
    ) -> NetworkResource | KubernetesResource | None:
522
        if resource_type == ResourceType.KUBERNETES:
1✔
523
            return self.kubernetes_resource_update(id=id, **graphql_arguments)  # type: ignore[attr-defined]
1✔
524

525
        return self.network_resource_update(id=id, **graphql_arguments)  # type: ignore[attr-defined]
1✔
526

527
    def network_resource_update(
1✔
528
        self: TwingateClientProtocol,
529
        *,
530
        id: str,
531
        name: str,
532
        address: str,
533
        alias: str | None,
534
        is_visible: bool,
535
        is_browser_shortcut_enabled: bool,
536
        remote_network_id: str,
537
        security_policy_id: str | None,
538
        protocols: dict[str, Any],
539
        tags: list[dict[str, str]],
540
    ) -> NetworkResource | None:
541
        result = self.execute_mutation(
1✔
542
            "resourceUpdate",
543
            MUT_UPDATE_NETWORK_RESOURCE,
544
            variable_values={
545
                "id": id,
546
                "name": name,
547
                "address": address,
548
                "alias": alias,
549
                "isVisible": is_visible,
550
                "isBrowserShortcutEnabled": is_browser_shortcut_enabled,
551
                "remoteNetworkId": remote_network_id,
552
                "securityPolicyId": security_policy_id,
553
                "protocols": protocols,
554
                "tags": tags,
555
            },
556
        )
557
        return NetworkResource(**result["entity"])
1✔
558

559
    def kubernetes_resource_update(
1✔
560
        self: TwingateClientProtocol,
561
        *,
562
        id: str,
563
        name: str,
564
        address: str,
565
        alias: str | None,
566
        is_visible: bool,
567
        remote_network_id: str,
568
        security_policy_id: str | None,
569
        protocols: dict[str, Any],
570
        tags: list[dict[str, str]],
571
        proxy_address: str,
572
        certificate_authority_cert: str,
573
    ) -> KubernetesResource | None:
574
        result = self.execute_mutation(
1✔
575
            "kubernetesResourceUpdate",
576
            MUT_UPDATE_KUBERNETES_RESOURCE,
577
            variable_values={
578
                "id": id,
579
                "name": name,
580
                "address": address,
581
                "alias": alias,
582
                "isVisible": is_visible,
583
                "remoteNetworkId": remote_network_id,
584
                "securityPolicyId": security_policy_id,
585
                "protocols": protocols,
586
                "tags": tags,
587
                "proxyAddress": proxy_address,
588
                "certificateAuthorityCert": certificate_authority_cert,
589
            },
590
        )
591
        return KubernetesResource(**result["entity"])
1✔
592

593
    def resource_delete(self: TwingateClientProtocol, resource_id: str) -> bool:
1✔
594
        try:
1✔
595
            result = self.execute_mutation(
1✔
596
                "resourceDelete",
597
                MUT_DELETE_RESOURCE,
598
                variable_values={"id": resource_id},
599
            )
600

601
            return bool(result["ok"])
1✔
602
        except GraphQLMutationError as gql_err:
1✔
603
            if "does not exist" in gql_err.error:
1✔
604
                return True
1✔
605

606
            raise
1✔
607
        except TransportQueryError:
1✔
608
            return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc