• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chiefonboarding / ChiefOnboarding / 25400571889

05 May 2026 08:29PM UTC coverage: 90.612% (-0.4%) from 90.986%
25400571889

Pull #643

github

web-flow
Merge 4e812ec20 into 15d676845
Pull Request #643: Adding new backfill option

8291 of 9150 relevant lines covered (90.61%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.92
back/admin/integrations/models.py
1
import base64
1✔
2
import io
1✔
3
import json
1✔
4
import time
1✔
5
import uuid
1✔
6
from datetime import timedelta
1✔
7
from json.decoder import JSONDecodeError as NativeJSONDecodeError
1✔
8

9
import requests
1✔
10
from django.conf import settings
1✔
11
from django.core.exceptions import ValidationError
1✔
12
from django.db import models
1✔
13
from django.db.models.signals import post_delete
1✔
14
from django.dispatch import receiver
1✔
15
from django.template import Context, Template
1✔
16
from django.template.loader import render_to_string
1✔
17
from django.urls import reverse, reverse_lazy
1✔
18
from django.utils import timezone
1✔
19
from django.utils.crypto import get_random_string
1✔
20
from django.utils.translation import gettext_lazy as _
1✔
21
from django_q.models import Schedule
1✔
22
from django_q.tasks import schedule
1✔
23
from requests.exceptions import (
1✔
24
    HTTPError,
25
    InvalidHeader,
26
    InvalidJSONError,
27
    InvalidSchema,
28
    InvalidURL,
29
    JSONDecodeError,
30
    MissingSchema,
31
    SSLError,
32
    Timeout,
33
    TooManyRedirects,
34
    URLRequired,
35
)
36
from twilio.rest import Client
1✔
37

38
from admin.integrations.exceptions import PritunlMissingCredentialsError
1✔
39
from admin.integrations.helpers.pritunl import pritunl_headers
1✔
40
from admin.integrations.serializers import (
1✔
41
    SyncUsersManifestSerializer,
42
    WebhookManifestSerializer,
43
)
44
from admin.integrations.utils import get_value_from_notation
1✔
45
from misc.fernet_fields import EncryptedTextField
1✔
46
from misc.fields import EncryptedJSONField
1✔
47
from organization.models import Notification
1✔
48
from organization.utils import has_manager_or_buddy_tags, send_email_with_notification
1✔
49

50

51
class IntegrationTracker(models.Model):
1✔
52
    """Model to track the integrations that ran. Gives insights into error messages"""
53

54
    class Category(models.IntegerChoices):
1✔
55
        EXECUTE = 0, _("Run the execute part")
1✔
56
        EXISTS = 1, _("Check if user exists")
1✔
57
        REVOKE = 2, _("Revoke user")
1✔
58

59
    integration = models.ForeignKey(
1✔
60
        "integrations.Integration", on_delete=models.CASCADE, null=True
61
    )
62
    category = models.IntegerField(choices=Category.choices)
1✔
63
    for_user = models.ForeignKey("users.User", on_delete=models.CASCADE, null=True)
1✔
64
    ran_at = models.DateTimeField(auto_now_add=True)
1✔
65

66
    @property
1✔
67
    def ran_execute_block(self):
1✔
68
        return self.category == IntegrationTracker.Category.EXECUTE
×
69

70
    @property
1✔
71
    def ran_exists_block(self):
1✔
72
        return self.category == IntegrationTracker.Category.EXISTS
×
73

74
    @property
1✔
75
    def ran_revoke_block(self):
1✔
76
        return self.category == IntegrationTracker.Category.REVOKE
×
77

78

79
class IntegrationTrackerStep(models.Model):
1✔
80
    tracker = models.ForeignKey(
1✔
81
        "integrations.IntegrationTracker",
82
        on_delete=models.CASCADE,
83
        related_name="steps",
84
    )
85
    status_code = models.IntegerField()
1✔
86
    json_response = models.JSONField()
1✔
87
    text_response = models.TextField()
1✔
88
    url = models.TextField()
1✔
89
    method = models.TextField()
1✔
90
    post_data = models.JSONField()
1✔
91
    headers = models.JSONField()
1✔
92
    expected = models.TextField()
1✔
93
    error = models.TextField()
1✔
94

95
    @property
1✔
96
    def has_succeeded(self):
1✔
97
        return self.status_code >= 200 and self.status_code < 300
×
98

99
    @property
1✔
100
    def found_expected(self):
1✔
101
        if self.expected == "":
1✔
102
            return True
1✔
103

104
        if self.text_response != "":
1✔
105
            return self.expected in self.text_response
×
106
        if len(self.json_response):
1✔
107
            return self.expected in json.dumps(self.json_response)
1✔
108
        return False
×
109

110
    @property
1✔
111
    def pretty_json_response(self):
1✔
112
        return json.dumps(self.json_response, indent=4)
1✔
113

114
    @property
1✔
115
    def pretty_headers(self):
1✔
116
        return json.dumps(self.headers, indent=4)
1✔
117

118
    @property
1✔
119
    def pretty_post_data(self):
1✔
120
        return json.dumps(self.post_data, indent=4)
1✔
121

122

123
class IntegrationManager(models.Manager):
1✔
124
    def get_queryset(self):
1✔
125
        return super().get_queryset()
1✔
126

127
    def sequence_integration_options(self):
1✔
128
        # any webhooks and account provisioning
129
        return self.get_queryset().filter(
1✔
130
            integration=Integration.Type.CUSTOM,
131
            manifest_type__in=[
132
                Integration.ManifestType.WEBHOOK,
133
                Integration.ManifestType.MANUAL_USER_PROVISIONING,
134
            ],
135
        )
136

137
    def account_provision_options(self):
1✔
138
        # only account provisioning (no general webhooks)
139
        return self.get_queryset().filter(
1✔
140
            integration=Integration.Type.CUSTOM,
141
            manifest_type=Integration.ManifestType.WEBHOOK,
142
            manifest__exists__isnull=False,
143
        ) | self.get_queryset().filter(
144
            integration=Integration.Type.CUSTOM,
145
            manifest_type=Integration.ManifestType.MANUAL_USER_PROVISIONING,
146
        )
147

148
    def import_users_options(self):
1✔
149
        # only import user items
150
        return (
1✔
151
            self.get_queryset()
152
            .filter(
153
                integration=Integration.Type.CUSTOM,
154
                manifest_type=Integration.ManifestType.SYNC_USERS,
155
            )
156
            .exclude(manifest__schedule__isnull=False)
157
        )
158

159

160
class IntegrationInactiveManager(models.Manager):
1✔
161
    def get_queryset(self):
1✔
162
        return super().get_queryset().filter(is_active=False)
1✔
163

164

165
class Integration(models.Model):
1✔
166
    class Type(models.IntegerChoices):
1✔
167
        SLACK_BOT = 0, _("Slack bot")
1✔
168
        CUSTOM = 10, _("Custom")
1✔
169

170
    class ManifestType(models.IntegerChoices):
1✔
171
        WEBHOOK = 0, _("Provision user accounts or trigger webhooks")
1✔
172
        SYNC_USERS = 1, _("Sync users")
1✔
173
        MANUAL_USER_PROVISIONING = (
1✔
174
            3,
175
            _("Manual user account provisioning, no manifest required"),
176
        )
177

178
    name = models.CharField(max_length=300, default="", blank=True)
1✔
179
    is_active = models.BooleanField(
1✔
180
        default=True, help_text="If inactive, it's a test/debug integration"
181
    )
182
    integration = models.IntegerField(choices=Type.choices)
1✔
183
    manifest_type = models.IntegerField(
1✔
184
        choices=ManifestType.choices, null=True, blank=True
185
    )
186
    token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
187
    refresh_token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
188
    base_url = models.CharField(max_length=22300, default="", blank=True)
1✔
189
    redirect_url = models.CharField(max_length=22300, default="", blank=True)
1✔
190
    account_id = models.CharField(max_length=22300, default="", blank=True)
1✔
191
    active = models.BooleanField(default=True)  # legacy?
1✔
192
    ttl = models.IntegerField(null=True, blank=True)
1✔
193
    expiring = models.DateTimeField(auto_now_add=True, blank=True)
1✔
194
    one_time_auth_code = models.UUIDField(
1✔
195
        default=uuid.uuid4, editable=False, unique=True
196
    )
197

198
    manifest = models.JSONField(default=dict, null=True, blank=True)
1✔
199
    extra_args = EncryptedJSONField(default=dict)
1✔
200
    enabled_oauth = models.BooleanField(default=False)
1✔
201

202
    # Slack
203
    app_id = models.CharField(max_length=100, default="")
1✔
204
    client_id = models.CharField(max_length=100, default="")
1✔
205
    client_secret = models.CharField(max_length=100, default="")
1✔
206
    signing_secret = models.CharField(max_length=100, default="")
1✔
207
    verification_token = models.CharField(max_length=100, default="")
1✔
208
    bot_token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
209
    bot_id = models.CharField(max_length=100, default="")
1✔
210

211
    @property
1✔
212
    def skip_user_provisioning(self):
1✔
213
        return self.manifest_type == Integration.ManifestType.MANUAL_USER_PROVISIONING
1✔
214

215
    @property
1✔
216
    def is_sync_users_integration(self):
1✔
217
        return self.manifest_type == Integration.ManifestType.SYNC_USERS
×
218

219
    @property
1✔
220
    def can_revoke_access(self):
1✔
221
        return len(self.manifest.get("revoke", []))
1✔
222

223
    @property
1✔
224
    def can_backfill_ids(self):
1✔
225
        # True when the manifest's `exists` block declares `store_data`,
226
        # meaning we can extract IDs from the lookup response and backfill
227
        # them into existing users' extra_fields.
228
        return bool(self.manifest.get("exists", {}).get("store_data"))
1✔
229

230
    @property
1✔
231
    def update_url(self):
1✔
232
        return reverse("integrations:update", args=[self.id])
1✔
233

234
    def get_icon_template(self):
1✔
235
        return render_to_string("_integration_config.html")
1✔
236

237
    @property
1✔
238
    def schedule_name(self):
1✔
239
        return f"User sync for integration: {self.id}"
1✔
240

241
    @property
1✔
242
    def secret_values(self):
1✔
243
        return [
1✔
244
            item
245
            for item in self.manifest["initial_data_form"]
246
            if item.get("secret", False) and item.get("name") != "generate"
247
        ]
248

249
    @property
1✔
250
    def missing_secret_values(self):
1✔
251
        return [
×
252
            item for item in self.secret_values if item["id"] not in self.extra_args
253
        ]
254

255
    @property
1✔
256
    def filled_secret_values(self):
1✔
257
        return [item for item in self.secret_values if item["id"] in self.extra_args]
1✔
258

259
    @property
1✔
260
    def requires_assigned_manager_or_buddy(self):
1✔
261
        # returns manager, buddy
262
        return has_manager_or_buddy_tags(self.manifest)
1✔
263

264
    def clean(self):
1✔
265
        if not self.manifest or self.skip_user_provisioning:
1✔
266
            # ignore field if form doesn't have it or no manifest is necessary
267
            return
1✔
268

269
        if self.manifest_type == Integration.ManifestType.WEBHOOK:
1✔
270
            manifest_serializer = WebhookManifestSerializer(data=self.manifest)
1✔
271
        else:
272
            manifest_serializer = SyncUsersManifestSerializer(data=self.manifest)
1✔
273
        if not manifest_serializer.is_valid():
1✔
274
            raise ValidationError({"manifest": json.dumps(manifest_serializer.errors)})
×
275

276
    def save(self, *args, **kwargs):
1✔
277
        super().save(*args, **kwargs)
1✔
278

279
        # skip if it's not a sync user integration (no background jobs for the others)
280
        if self.manifest_type != Integration.ManifestType.SYNC_USERS:
1✔
281
            return
1✔
282

283
        # update the background job based on the manifest
284
        schedule_cron = self.manifest.get("schedule")
1✔
285

286
        try:
1✔
287
            schedule_obj = Schedule.objects.get(name=self.schedule_name)
1✔
288
        except Schedule.DoesNotExist:
1✔
289
            # Schedule does not exist yet, so create it if specified
290
            if schedule_cron:
1✔
291
                schedule(
1✔
292
                    "admin.integrations.tasks.sync_user_info",
293
                    self.id,
294
                    schedule_type=Schedule.CRON,
295
                    cron=schedule_cron,
296
                    name=self.schedule_name,
297
                )
298
            return
1✔
299

300
        # delete if cron was removed
301
        if schedule_cron is None:
1✔
302
            schedule_obj.delete()
1✔
303
            return
1✔
304

305
        # if schedule changed, then update
306
        if schedule_obj.cron != schedule_cron:
×
307
            schedule_obj.cron = schedule_cron
×
308
            schedule_obj.save()
×
309

310
    def register_manual_integration_run(self, user):
1✔
311
        from users.models import IntegrationUser
1✔
312

313
        IntegrationUser.objects.update_or_create(
1✔
314
            user=user,
315
            integration=self,
316
            defaults={"revoked": user.is_offboarding},
317
        )
318

319
    def cast_to_json(self, value):
1✔
320
        try:
1✔
321
            value = json.loads(value)
1✔
322
        except Exception:
1✔
323
            pass
1✔
324

325
        return value
1✔
326

327
    def run_request(self, data):
1✔
328
        url = self._replace_vars(data["url"])
1✔
329
        if "data" in data:
1✔
330
            post_data = self._replace_vars(json.dumps(data["data"]))
1✔
331
        else:
332
            post_data = {}
1✔
333
        if data.get("cast_data_to_json", True):
1✔
334
            post_data = self.cast_to_json(post_data)
1✔
335

336
        error = ""
1✔
337

338
        # extract files from locally saved files and send them with the request
339
        files_to_send = {}
1✔
340
        for field_name, file_name in data.get("files", {}).items():
1✔
341
            try:
1✔
342
                files_to_send[field_name] = (file_name, self.params["files"][file_name])
1✔
343
            except KeyError:
1✔
344
                error = f"{file_name} could not be found in the locally saved files"
1✔
345
                if hasattr(self, "tracker"):
1✔
346
                    IntegrationTrackerStep.objects.create(
1✔
347
                        status_code=0,
348
                        tracker=self.tracker,
349
                        json_response={},
350
                        text_response=error,
351
                        url=self.clean_response(url),
352
                        method=data.get("method", "POST"),
353
                        post_data=json.loads(
354
                            self.clean_response(self.cast_to_json(post_data))
355
                        ),
356
                        headers=json.loads(
357
                            self.clean_response(self.headers(data.get("headers", {})))
358
                        ),
359
                        error=error,
360
                    )
361
                return False, error
1✔
362

363
        response = None
1✔
364
        headers = self.headers(data.get("headers", {}))
1✔
365
        try:
1✔
366
            if data.get("extra_headers", "") == "pritunl":
1✔
367
                headers.update(
1✔
368
                    pritunl_headers(data.get("method", "POST"), url, self.extra_args)
369
                )
370
            response = requests.request(
1✔
371
                data.get("method", "POST"),
372
                url,
373
                headers=headers,
374
                data=post_data,
375
                files=files_to_send,
376
                timeout=120,
377
            )
378
        except PritunlMissingCredentialsError as e:
1✔
379
            error = str(e)
1✔
380

381
        except (InvalidJSONError, JSONDecodeError):
1✔
382
            error = "JSON is invalid"
×
383

384
        except HTTPError:
1✔
385
            error = "An HTTP error occurred"
×
386

387
        except SSLError:
1✔
388
            error = "An SSL error occurred"
×
389

390
        except Timeout:
1✔
391
            error = "The request timed out"
1✔
392

393
        except (URLRequired, MissingSchema, InvalidSchema, InvalidURL):
1✔
394
            error = "The url is invalid"
1✔
395

396
        except TooManyRedirects:
×
397
            error = "There are too many redirects"
×
398

399
        except InvalidHeader:
×
400
            error = "The header is invalid"
×
401

402
        except:  # noqa E722
×
403
            error = "There was an unexpected error with the request"
×
404

405
        if response is not None and error == "":
1✔
406
            if len(data.get("status_code", [])) and str(
1✔
407
                response.status_code
408
            ) not in data.get("status_code", []):
409
                error = f"Status code ({response.status_code}) not in allowed list ({data.get('status_code')})"
×
410

411
        try:
1✔
412
            json_response = response.json()
1✔
413
            text_response = ""
1✔
414
        except:  # noqa E722
1✔
415
            json_response = {}
1✔
416
            if error:
1✔
417
                text_response = error
1✔
418
            else:
419
                text_response = response.text
1✔
420

421
        if hasattr(self, "tracker"):
1✔
422
            # TODO: JSON needs to be refactored
423
            try:
1✔
424
                json_payload = json.loads(self.clean_response(json_response))
1✔
425
            except (NativeJSONDecodeError, TypeError):
×
426
                json_payload = self.clean_response(json_response)
×
427

428
            try:
1✔
429
                json_post_payload = json.loads(
1✔
430
                    self.clean_response(self.cast_to_json(post_data))
431
                )
432
            except (NativeJSONDecodeError, TypeError):
×
433
                json_post_payload = self.clean_response(self.cast_to_json(post_data))
×
434

435
            try:
1✔
436
                json_headers_payload = json.loads(self.clean_response(headers))
1✔
437
            except (NativeJSONDecodeError, TypeError):
×
438
                json_headers_payload = self.clean_response(headers)
×
439

440
            IntegrationTrackerStep.objects.create(
1✔
441
                status_code=0 if response is None else response.status_code,
442
                tracker=self.tracker,
443
                json_response=json_payload,
444
                text_response=(
445
                    "Cannot display, could be file"
446
                    if data.get("save_as_file", False)
447
                    else self.clean_response(text_response)
448
                ),
449
                url=self.clean_response(url),
450
                method=data.get("method", "POST"),
451
                post_data=json_post_payload,
452
                headers=json_headers_payload,
453
                expected=self._replace_vars(data.get("expected", "")),
454
                error=self.clean_response(error),
455
            )
456

457
        if error:
1✔
458
            return False, error
1✔
459

460
        return True, response
1✔
461

462
    def _replace_vars(self, text):
1✔
463
        params = {} if not hasattr(self, "params") else self.params
1✔
464
        if self.pk is not None:
1✔
465
            params["redirect_url"] = settings.BASE_URL + reverse_lazy(
1✔
466
                "integrations:oauth-callback", args=[self.id]
467
            )
468
        if hasattr(self, "new_hire") and self.new_hire is not None:
1✔
469
            text = self.new_hire.personalize(text, self.extra_args | params)
1✔
470
            return text
1✔
471
        t = Template(text)
1✔
472
        context = Context(self.extra_args | params)
1✔
473
        text = t.render(context)
1✔
474
        return text
1✔
475

476
    @property
1✔
477
    def has_oauth(self):
1✔
478
        return "oauth" in self.manifest and len(self.manifest.get("oauth", {}))
1✔
479

480
    def headers(self, headers=None):
1✔
481
        if headers is None:
1✔
482
            headers = {}
1✔
483

484
        headers = (
1✔
485
            self.manifest.get("headers", {}).items()
486
            if len(headers) == 0
487
            else headers.items()
488
        )
489
        new_headers = {}
1✔
490
        for key, value in headers:
1✔
491
            # If Basic authentication then swap to base64
492
            if key == "Authorization" and value.startswith("Basic"):
1✔
493
                auth_details = self._replace_vars(value.split(" ", 1)[1])
1✔
494
                value = "Basic " + base64.b64encode(
1✔
495
                    auth_details.encode("ascii")
496
                ).decode("ascii")
497

498
            # Adding an empty string to force to return a string instead of a
499
            # safestring. Ref: https://github.com/psf/requests/issues/6159
500
            new_headers[self._replace_vars(key) + ""] = self._replace_vars(value) + ""
1✔
501
        return new_headers
1✔
502

503
    def user_exists(self, new_hire, save_result=True):
1✔
504
        from users.models import IntegrationUser
1✔
505

506
        # check if user has been created manually
507
        if self.skip_user_provisioning:
1✔
508
            try:
1✔
509
                user_integration = IntegrationUser.objects.get(
1✔
510
                    user=new_hire, integration=self
511
                )
512
            except IntegrationUser.DoesNotExist:
1✔
513
                return False
1✔
514

515
            return not user_integration.revoked
1✔
516

517
        if not len(self.manifest.get("exists", [])):
1✔
518
            return None
×
519

520
        self.tracker = IntegrationTracker.objects.create(
1✔
521
            category=IntegrationTracker.Category.EXISTS,
522
            integration=self,
523
            for_user=new_hire,
524
        )
525

526
        self.new_hire = new_hire
1✔
527
        self.has_user_context = new_hire is not None
1✔
528

529
        # Renew token if necessary
530
        if not self.renew_key():
1✔
531
            return
×
532

533
        success, response = self.run_request(self.manifest["exists"])
1✔
534

535
        if not success:
1✔
536
            return None
1✔
537

538
        user_exists = self.tracker.steps.last().found_expected
1✔
539

540
        # If the user was found and the manifest declares store_data on its
541
        # exists block, capture those values into extra_fields. Lets a single
542
        # lookup populate IDs (e.g. ATLASSIAN_USER_ID, bitwarden_id) for users
543
        # that pre-existed in the upstream system.
544
        store_data = self.manifest["exists"].get("store_data", {})
1✔
545
        if user_exists and store_data:
1✔
546
            try:
×
547
                json_response = response.json()
×
548
            except (ValueError, AttributeError):
×
549
                json_response = {}
×
550
            for new_hire_prop, notation in store_data.items():
×
551
                try:
×
552
                    value = get_value_from_notation(
×
553
                        self._replace_vars(notation), json_response
554
                    )
555
                except KeyError:
×
556
                    continue
×
557
                if value is None:
×
558
                    continue
×
559
                new_hire.extra_fields[new_hire_prop] = value
×
560
            new_hire.save()
×
561

562
        if save_result:
1✔
563
            IntegrationUser.objects.update_or_create(
1✔
564
                integration=self, user=new_hire, defaults={"revoked": not user_exists}
565
            )
566

567
        return user_exists
1✔
568

569
    def needs_user_info(self, user):
1✔
570
        if self.skip_user_provisioning:
1✔
571
            return False
1✔
572

573
        # form created from the manifest, this info is always needed to create a new
574
        # account. Check if there is anything that needs to be filled
575
        form = self.manifest.get("form", [])
1✔
576

577
        # extra items that are needed from the integration (often prefilled by admin)
578
        extra_user_info = self.manifest.get("extra_user_info", [])
1✔
579
        needs_more_info = any(
1✔
580
            item["id"] not in user.extra_fields.keys() for item in extra_user_info
581
        )
582

583
        return len(form) > 0 or needs_more_info
1✔
584

585
    def revoke_user(self, user):
1✔
586
        if self.skip_user_provisioning:
1✔
587
            # should never be triggered
588
            return False, "Cannot revoke manual integration"
1✔
589

590
        self.new_hire = user
1✔
591
        self.has_user_context = True
1✔
592

593
        # Renew token if necessary
594
        if not self.renew_key():
1✔
595
            return False, "Couldn't renew key"
×
596

597
        revoke_manifest = self.manifest.get("revoke", [])
1✔
598

599
        # add extra fields directly to params
600
        self.params = self.new_hire.extra_fields
1✔
601
        self.tracker = IntegrationTracker.objects.create(
1✔
602
            category=IntegrationTracker.Category.REVOKE,
603
            integration=self if self.pk is not None else None,
604
            for_user=self.new_hire,
605
        )
606

607
        for item in revoke_manifest:
1✔
608
            success, response = self.run_request(item)
1✔
609

610
            if not success or not self.tracker.steps.last().found_expected:
1✔
611
                return False, self.clean_response(response)
1✔
612

613
        return True, ""
1✔
614

615
    def renew_key(self):
1✔
616
        # Oauth2 refreshing access token if needed
617
        success = True
1✔
618
        if (
1✔
619
            self.has_oauth
620
            and "expires_in" in self.extra_args.get("oauth", {})
621
            and self.expiring < timezone.now()
622
        ):
623
            success, response = self.run_request(self.manifest["oauth"]["refresh"])
1✔
624

625
            if not success:
1✔
626
                user = self.new_hire if self.has_user_context else None
1✔
627
                Notification.objects.create(
1✔
628
                    notification_type=Notification.Type.FAILED_INTEGRATION,
629
                    extra_text=self.name,
630
                    created_for=user,
631
                    description="Refresh url: " + str(response),
632
                )
633
                return success
1✔
634

635
            self.extra_args["oauth"] |= response.json()
1✔
636
            if "expires_in" in response.json():
1✔
637
                self.expiring = timezone.now() + timedelta(
1✔
638
                    seconds=response.json()["expires_in"]
639
                )
640
            self.save(update_fields=["expiring", "extra_args"])
1✔
641
            if hasattr(self, "tracker"):
1✔
642
                # we need to clean the last step as we now probably got new secret keys
643
                # that need to be masked
644
                last_step = self.tracker.steps.last()
×
645
                last_step.json_response = self.clean_response(last_step.json_response)
×
646
                last_step.save()
×
647

648
        return success
1✔
649

650
    def _check_condition(self, response, condition):
1✔
651
        value = self._replace_vars(condition.get("value"))
1✔
652
        try:
1✔
653
            # first argument will be taken from the response
654
            response_value = get_value_from_notation(
1✔
655
                condition.get("response_notation"), response.json()
656
            )
657
        except KeyError:
×
658
            # we know that the result might not be in the response yet, as we are
659
            # waiting for the correct response, so just respond with an empty string
660
            response_value = ""
×
661
        return value == response_value
1✔
662

663
    def _polling(self, item, response):
1✔
664
        polling = item.get("polling")
1✔
665
        continue_if = item.get("continue_if")
1✔
666
        interval = polling.get("interval")
1✔
667
        amount = polling.get("amount")
1✔
668

669
        got_expected_result = self._check_condition(response, continue_if)
1✔
670
        if got_expected_result:
1✔
671
            return True, response
×
672

673
        tried = 1
1✔
674
        while amount > tried:
1✔
675
            time.sleep(interval)
1✔
676
            success, response = self.run_request(item)
1✔
677
            got_expected_result = self._check_condition(response, continue_if)
1✔
678
            if got_expected_result:
1✔
679
                return True, response
1✔
680
            tried += 1
1✔
681
        # if exceeding the max amounts, then fail
682
        return False, response
1✔
683

684
    def execute(self, new_hire=None, params=None, retry_on_failure=False):
1✔
685
        self.params = params or {}
1✔
686
        self.params["responses"] = []
1✔
687
        self.params["files"] = {}
1✔
688
        self.new_hire = new_hire
1✔
689
        self.has_user_context = new_hire is not None
1✔
690

691
        self.tracker = IntegrationTracker.objects.create(
1✔
692
            category=IntegrationTracker.Category.EXECUTE,
693
            integration=self if self.pk is not None else None,
694
            for_user=self.new_hire,
695
        )
696

697
        if self.has_user_context:
1✔
698
            self.params |= new_hire.extra_fields
1✔
699
            self.new_hire = new_hire
1✔
700

701
        # Renew token if necessary
702
        if not self.renew_key():
1✔
703
            return False, None
×
704

705
        # Add generated secrets
706
        for item in self.manifest.get("initial_data_form", []):
1✔
707
            if "name" in item and item["name"] == "generate":
1✔
708
                self.extra_args[item["id"]] = get_random_string(length=10)
1✔
709

710
        response = None
1✔
711
        # Run all requests
712
        for item in self.manifest["execute"]:
1✔
713
            success, response = self.run_request(item)
1✔
714

715
            # check if we need to poll before continuing
716
            if polling := item.get("polling", False):
1✔
717
                success, response = self._polling(item, response)
1✔
718

719
            # check if we need to block this integration based on condition
720
            if continue_if := item.get("continue_if", False):
1✔
721
                got_expected_result = self._check_condition(response, continue_if)
1✔
722
                if not got_expected_result:
1✔
723
                    response = self.clean_response(response=response)
1✔
724
                    Notification.objects.create(
1✔
725
                        notification_type=Notification.Type.BLOCKED_INTEGRATION,
726
                        extra_text=self.name,
727
                        created_for=new_hire,
728
                        description=f"Execute url ({item['url']}): {response}",
729
                    )
730
                    return False, response
1✔
731

732
            # No need to retry or log when we are importing users
733
            if not success:
1✔
734
                if self.has_user_context:
1✔
735
                    response = self.clean_response(response=response)
1✔
736
                    if polling:
1✔
737
                        response = "Polling timed out: " + response
×
738
                    Notification.objects.create(
1✔
739
                        notification_type=Notification.Type.FAILED_INTEGRATION,
740
                        extra_text=self.name,
741
                        created_for=new_hire,
742
                        description=f"Execute url ({item['url']}): {response}",
743
                    )
744
                if retry_on_failure:
1✔
745
                    # Retry url in one hour
746
                    schedule(
×
747
                        "admin.integrations.tasks.retry_integration",
748
                        new_hire.id,
749
                        self.id,
750
                        params,
751
                        name=(
752
                            f"Retrying integration {self.id} for new hire {new_hire.id}"
753
                        ),
754
                        next_run=timezone.now() + timedelta(hours=1),
755
                        schedule_type=Schedule.ONCE,
756
                    )
757
                return False, response
1✔
758

759
            # save if file, so we can reuse later
760
            save_as_file = item.get("save_as_file")
1✔
761
            if save_as_file is not None:
1✔
762
                self.params["files"][save_as_file] = io.BytesIO(response.content)
1✔
763

764
            # save json response temporarily to be reused in other parts
765
            try:
1✔
766
                self.params["responses"].append(response.json())
1✔
767
            except:  # noqa E722
1✔
768
                # if we save a file, then just append an empty dict
769
                self.params["responses"].append({})
1✔
770

771
            # store data coming back from response to the user, so we can reuse in other
772
            # integrations
773
            if store_data := item.get("store_data", {}):
1✔
774
                for new_hire_prop, notation_for_response in store_data.items():
1✔
775
                    try:
1✔
776
                        value = get_value_from_notation(
1✔
777
                            notation_for_response, response.json()
778
                        )
779
                    except KeyError:
1✔
780
                        return (
1✔
781
                            False,
782
                            f"Could not store data to new hire: {notation_for_response}"
783
                            f" not found in {self.clean_response(response.json())}",
784
                        )
785

786
                    # save to new hire and to temp var `params` on this model for use in
787
                    # the same integration
788
                    new_hire.extra_fields[new_hire_prop] = value
1✔
789
                    self.params[new_hire_prop] = value
1✔
790
                new_hire.save()
1✔
791

792
        # Run all post requests (notifications)
793
        for item in self.manifest.get("post_execute_notification", []):
1✔
794
            if item["type"] == "email":
1✔
795
                send_email_with_notification(
1✔
796
                    subject=self._replace_vars(item["subject"]),
797
                    message=self._replace_vars(item["message"]),
798
                    to=self._replace_vars(item["to"]),
799
                    notification_type=(
800
                        Notification.Type.SENT_EMAIL_INTEGRATION_NOTIFICATION
801
                    ),
802
                )
803
                return True, None
1✔
804
            else:
805
                try:
×
806
                    client = Client(
×
807
                        settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN
808
                    )
809
                    client.messages.create(
×
810
                        to=new_hire.phone,
811
                        from_=settings.TWILIO_FROM_NUMBER,
812
                        body=self._replace_vars(item["message"]),
813
                    )
814
                except Exception:
×
815
                    Notification.objects.create(
×
816
                        notification_type=(
817
                            Notification.Type.FAILED_TEXT_INTEGRATION_NOTIFICATION
818
                        ),
819
                        extra_text=self.name,
820
                        created_for=new_hire,
821
                    )
822
                    return True, None
×
823

824
        # Succesfully ran integration, add notification only when we are provisioning
825
        # access
826
        if self.has_user_context:
1✔
827
            Notification.objects.create(
1✔
828
                notification_type=Notification.Type.RAN_INTEGRATION,
829
                extra_text=self.name,
830
                created_for=new_hire,
831
            )
832
        return True, response
1✔
833

834
    def config_form(self, data=None):
1✔
835
        if self.skip_user_provisioning:
1✔
836
            from .forms import ManualIntegrationConfigForm
1✔
837

838
            return ManualIntegrationConfigForm(data=data)
1✔
839

840
        from .forms import IntegrationConfigForm
1✔
841

842
        return IntegrationConfigForm(instance=self, data=data)
1✔
843

844
    def clean_response(self, response) -> str:
1✔
845
        if not isinstance(response, str):
1✔
846
            try:
1✔
847
                response = json.dumps(response)
1✔
848
            except (TypeError, ValueError):
1✔
849
                response = str(response)
1✔
850

851
        for name, value in self.extra_args.items():
1✔
852
            if isinstance(value, dict):
1✔
853
                for inner_name, inner_value in value.items():
×
854
                    response = response.replace(
×
855
                        str(inner_value),
856
                        _("***Secret value for %(name)s***")
857
                        % {"name": name + "." + inner_name},
858
                    )
859
            elif value != "":
1✔
860
                response = response.replace(
1✔
861
                    str(value), _("***Secret value for %(name)s***") % {"name": name}
862
                )
863

864
            if name == "Authorization" and value.startswith("Basic"):
1✔
865
                response = response.replace(
×
866
                    base64.b64encode(value.split(" ", 1)[1].encode("ascii")).decode(
867
                        "ascii"
868
                    ),
869
                    "BASE64 ENCODED SECRET",
870
                )
871

872
        return response
1✔
873

874
    objects = IntegrationManager()
1✔
875
    inactive = IntegrationInactiveManager()
1✔
876

877

878
@receiver(post_delete, sender=Integration)
1✔
879
def delete_schedule(sender, instance, **kwargs):
1✔
880
    Schedule.objects.filter(name=instance.schedule_name).delete()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc