• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chiefonboarding / ChiefOnboarding / 8211573286

09 Mar 2024 02:31AM UTC coverage: 92.102% (+0.03%) from 92.074%
8211573286

Pull #438

github

web-flow
Merge 660291404 into 4976f25de
Pull Request #438: Add option to make initial data secret

6670 of 7242 relevant lines covered (92.1%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
back/admin/integrations/models.py
1
import base64
1✔
2
import io
1✔
3
import json
1✔
4
import time
1✔
5
import uuid
1✔
6
from datetime import timedelta
1✔
7
from json.decoder import JSONDecodeError as NativeJSONDecodeError
1✔
8

9
import requests
1✔
10
from django.conf import settings
1✔
11
from django.core.exceptions import ValidationError
1✔
12
from django.db import models
1✔
13
from django.db.models.signals import post_delete
1✔
14
from django.dispatch import receiver
1✔
15
from django.template import Context, Template
1✔
16
from django.template.loader import render_to_string
1✔
17
from django.urls import reverse, reverse_lazy
1✔
18
from django.utils import timezone
1✔
19
from django.utils.crypto import get_random_string
1✔
20
from django.utils.translation import gettext_lazy as _
1✔
21
from django_q.models import Schedule
1✔
22
from django_q.tasks import schedule
1✔
23
from requests.exceptions import (
1✔
24
    HTTPError,
25
    InvalidHeader,
26
    InvalidJSONError,
27
    InvalidSchema,
28
    InvalidURL,
29
    JSONDecodeError,
30
    MissingSchema,
31
    SSLError,
32
    Timeout,
33
    TooManyRedirects,
34
    URLRequired,
35
)
36
from twilio.rest import Client
1✔
37

38
from admin.integrations.serializers import (
1✔
39
    SyncUsersManifestSerializer,
40
    WebhookManifestSerializer,
41
)
42
from admin.integrations.utils import get_value_from_notation
1✔
43
from misc.fernet_fields import EncryptedTextField
1✔
44
from misc.fields import EncryptedJSONField
1✔
45
from organization.models import Notification
1✔
46
from organization.utils import has_manager_or_buddy_tags, send_email_with_notification
1✔
47

48

49
class IntegrationTracker(models.Model):
1✔
50
    """Model to track the integrations that ran. Gives insights into error messages"""
51

52
    class Category(models.IntegerChoices):
1✔
53
        EXECUTE = 0, _("Run the execute part")
1✔
54
        EXISTS = 1, _("Check if user exists")
1✔
55
        REVOKE = 2, _("Revoke user")
1✔
56

57
    integration = models.ForeignKey(
1✔
58
        "integrations.Integration", on_delete=models.CASCADE, null=True
59
    )
60
    category = models.IntegerField(choices=Category.choices)
1✔
61
    for_user = models.ForeignKey("users.User", on_delete=models.CASCADE, null=True)
1✔
62
    ran_at = models.DateTimeField(auto_now_add=True)
1✔
63

64
    @property
1✔
65
    def ran_execute_block(self):
1✔
66
        return self.category == IntegrationTracker.Category.EXECUTE
×
67

68
    @property
1✔
69
    def ran_exists_block(self):
1✔
70
        return self.category == IntegrationTracker.Category.EXISTS
×
71

72
    @property
1✔
73
    def ran_revoke_block(self):
1✔
74
        return self.category == IntegrationTracker.Category.REVOKE
×
75

76

77
class IntegrationTrackerStep(models.Model):
1✔
78
    tracker = models.ForeignKey(
1✔
79
        "integrations.IntegrationTracker",
80
        on_delete=models.CASCADE,
81
        related_name="steps",
82
    )
83
    status_code = models.IntegerField()
1✔
84
    json_response = models.JSONField()
1✔
85
    text_response = models.TextField()
1✔
86
    url = models.TextField()
1✔
87
    method = models.TextField()
1✔
88
    post_data = models.JSONField()
1✔
89
    headers = models.JSONField()
1✔
90
    error = models.TextField()
1✔
91

92
    @property
1✔
93
    def has_succeeded(self):
1✔
94
        return self.status_code >= 200 and self.status_code < 300
×
95

96
    @property
1✔
97
    def pretty_json_response(self):
1✔
98
        return json.dumps(self.json_response, indent=4)
1✔
99

100
    @property
1✔
101
    def pretty_headers(self):
1✔
102
        return json.dumps(self.headers, indent=4)
1✔
103

104
    @property
1✔
105
    def pretty_post_data(self):
1✔
106
        return json.dumps(self.post_data, indent=4)
1✔
107

108

109
class IntegrationManager(models.Manager):
1✔
110
    def get_queryset(self):
1✔
111
        return super().get_queryset()
1✔
112

113
    def sequence_integration_options(self):
1✔
114
        # any webhooks and account provisioning
115
        return self.get_queryset().filter(
1✔
116
            integration=Integration.Type.CUSTOM,
117
            manifest_type__in=[
118
                Integration.ManifestType.WEBHOOK,
119
                Integration.ManifestType.MANUAL_USER_PROVISIONING,
120
            ],
121
        )
122

123
    def account_provision_options(self):
1✔
124
        # only account provisioning (no general webhooks)
125
        return self.get_queryset().filter(
1✔
126
            integration=Integration.Type.CUSTOM,
127
            manifest_type=Integration.ManifestType.WEBHOOK,
128
            manifest__exists__isnull=False,
129
        ) | self.get_queryset().filter(
130
            integration=Integration.Type.CUSTOM,
131
            manifest_type=Integration.ManifestType.MANUAL_USER_PROVISIONING,
132
        )
133

134
    def import_users_options(self):
1✔
135
        # only import user items
136
        return (
1✔
137
            self.get_queryset()
138
            .filter(
139
                integration=Integration.Type.CUSTOM,
140
                manifest_type=Integration.ManifestType.SYNC_USERS,
141
            )
142
            .exclude(manifest__schedule__isnull=False)
143
        )
144

145

146
class Integration(models.Model):
1✔
147
    class Type(models.IntegerChoices):
1✔
148
        SLACK_BOT = 0, _("Slack bot")
1✔
149
        SLACK_ACCOUNT_CREATION = 1, _("Slack account creation")  # legacy
1✔
150
        GOOGLE_ACCOUNT_CREATION = 2, _("Google account creation")  # legacy
1✔
151
        GOOGLE_LOGIN = 3, _("Google Login")
1✔
152
        ASANA = 4, _("Asana")  # legacy
1✔
153
        CUSTOM = 10, _("Custom")
1✔
154

155
    class ManifestType(models.IntegerChoices):
1✔
156
        WEBHOOK = 0, _("Provision user accounts or trigger webhooks")
1✔
157
        SYNC_USERS = 1, _("Sync users")
1✔
158
        MANUAL_USER_PROVISIONING = (
1✔
159
            3,
160
            _("Manual user account provisioning, no manifest required"),
161
        )
162

163
    name = models.CharField(max_length=300, default="", blank=True)
1✔
164
    integration = models.IntegerField(choices=Type.choices)
1✔
165
    manifest_type = models.IntegerField(
1✔
166
        choices=ManifestType.choices, null=True, blank=True
167
    )
168
    token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
169
    refresh_token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
170
    base_url = models.CharField(max_length=22300, default="", blank=True)
1✔
171
    redirect_url = models.CharField(max_length=22300, default="", blank=True)
1✔
172
    account_id = models.CharField(max_length=22300, default="", blank=True)
1✔
173
    active = models.BooleanField(default=True)
1✔
174
    ttl = models.IntegerField(null=True, blank=True)
1✔
175
    expiring = models.DateTimeField(auto_now_add=True, blank=True)
1✔
176
    one_time_auth_code = models.UUIDField(
1✔
177
        default=uuid.uuid4, editable=False, unique=True
178
    )
179

180
    manifest = models.JSONField(default=dict, null=True, blank=True)
1✔
181
    extra_args = EncryptedJSONField(default=dict)
1✔
182
    enabled_oauth = models.BooleanField(default=False)
1✔
183

184
    # Slack
185
    app_id = models.CharField(max_length=100, default="")
1✔
186
    client_id = models.CharField(max_length=100, default="")
1✔
187
    client_secret = models.CharField(max_length=100, default="")
1✔
188
    signing_secret = models.CharField(max_length=100, default="")
1✔
189
    verification_token = models.CharField(max_length=100, default="")
1✔
190
    bot_token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
191
    bot_id = models.CharField(max_length=100, default="")
1✔
192

193
    @property
1✔
194
    def skip_user_provisioning(self):
1✔
195
        return self.manifest_type == Integration.ManifestType.MANUAL_USER_PROVISIONING
1✔
196

197
    @property
1✔
198
    def can_revoke_access(self):
1✔
199
        return self.manifest.get("revoke", False)
1✔
200

201
    @property
1✔
202
    def update_url(self):
1✔
203
        return reverse("integrations:update", args=[self.id])
1✔
204

205
    def get_icon_template(self):
1✔
206
        return render_to_string("_integration_config.html")
1✔
207

208
    @property
1✔
209
    def schedule_name(self):
1✔
210
        return f"User sync for integration: {self.id}"
1✔
211

212
    @property
1✔
213
    def secret_values(self):
1✔
214
        print(self.manifest["initial_data_form"])
1✔
215
        return [
1✔
216
            item
217
            for item in self.manifest["initial_data_form"]
218
            if item.get("secret", False) and item.get("name") != "generate"
219
        ]
220

221
    @property
1✔
222
    def missing_secret_values(self):
1✔
223
        return [
×
224
            item for item in self.secret_values if item["id"] not in self.extra_args
225
        ]
226

227
    @property
1✔
228
    def filled_secret_values(self):
1✔
229
        return [item for item in self.secret_values if item["id"] in self.extra_args]
1✔
230

231
    @property
1✔
232
    def requires_assigned_manager_or_buddy(self):
1✔
233
        # returns manager, buddy
234
        return has_manager_or_buddy_tags(self.manifest)
1✔
235

236
    def clean(self):
1✔
237
        if not self.manifest or self.skip_user_provisioning:
1✔
238
            # ignore field if form doesn't have it or no manifest is necessary
239
            return
1✔
240

241
        if self.manifest_type == Integration.ManifestType.WEBHOOK:
1✔
242
            manifest_serializer = WebhookManifestSerializer(data=self.manifest)
1✔
243
        else:
244
            manifest_serializer = SyncUsersManifestSerializer(data=self.manifest)
1✔
245
        if not manifest_serializer.is_valid():
1✔
246
            raise ValidationError({"manifest": json.dumps(manifest_serializer.errors)})
×
247

248
    def save(self, *args, **kwargs):
1✔
249
        super().save(*args, **kwargs)
1✔
250

251
        # skip if it's not a sync user integration (no background jobs for the others)
252
        if self.manifest_type != Integration.ManifestType.SYNC_USERS:
1✔
253
            return
1✔
254

255
        # update the background job based on the manifest
256
        schedule_cron = self.manifest.get("schedule")
1✔
257

258
        try:
1✔
259
            schedule_obj = Schedule.objects.get(name=self.schedule_name)
1✔
260
        except Schedule.DoesNotExist:
1✔
261
            # Schedule does not exist yet, so create it if specified
262
            if schedule_cron:
1✔
263
                schedule(
1✔
264
                    "admin.integrations.tasks.sync_user_info",
265
                    self.id,
266
                    schedule_type=Schedule.CRON,
267
                    cron=schedule_cron,
268
                    name=self.schedule_name,
269
                )
270
            return
1✔
271

272
        # delete if cron was removed
273
        if schedule_cron is None:
1✔
274
            schedule_obj.delete()
1✔
275
            return
1✔
276

277
        # if schedule changed, then update
278
        if schedule_obj.cron != schedule_cron:
×
279
            schedule_obj.cron = schedule_cron
×
280
            schedule_obj.save()
×
281

282
    def register_manual_integration_run(self, user):
1✔
283
        from users.models import IntegrationUser
1✔
284

285
        integration_user, created = IntegrationUser.objects.update_or_create(
1✔
286
            user=user,
287
            integration=self,
288
            defaults={"revoked": user.is_offboarding},
289
        )
290

291
    def cast_to_json(self, value):
1✔
292
        try:
1✔
293
            value = json.loads(value)
1✔
294
        except Exception:
1✔
295
            pass
1✔
296

297
        return value
1✔
298

299
    def run_request(self, data):
1✔
300
        url = self._replace_vars(data["url"])
1✔
301
        if "data" in data:
1✔
302
            post_data = self._replace_vars(json.dumps(data["data"]))
1✔
303
        else:
304
            post_data = {}
1✔
305
        if data.get("cast_data_to_json", False):
1✔
306
            post_data = self.cast_to_json(post_data)
×
307

308
        error = ""
1✔
309

310
        # extract files from locally saved files and send them with the request
311
        files_to_send = {}
1✔
312
        for field_name, file_name in data.get("files", {}).items():
1✔
313
            try:
1✔
314
                files_to_send[field_name] = (file_name, self.params["files"][file_name])
1✔
315
            except KeyError:
1✔
316
                error = f"{file_name} could not be found in the locally saved files"
1✔
317
                if hasattr(self, "tracker"):
1✔
318
                    IntegrationTrackerStep.objects.create(
1✔
319
                        status_code=0,
320
                        tracker=self.tracker,
321
                        json_response={},
322
                        text_response=error,
323
                        url=self.clean_response(url),
324
                        method=data.get("method", "POST"),
325
                        post_data=json.loads(
326
                            self.clean_response(self.cast_to_json(post_data))
327
                        ),
328
                        headers=json.loads(
329
                            self.clean_response(self.headers(data.get("headers", {})))
330
                        ),
331
                        error=error,
332
                    )
333
                return False, error
1✔
334

335
        response = None
1✔
336
        try:
1✔
337
            response = requests.request(
1✔
338
                data.get("method", "POST"),
339
                url,
340
                headers=self.headers(data.get("headers", {})),
341
                data=post_data,
342
                files=files_to_send,
343
                timeout=120,
344
            )
345
        except (InvalidJSONError, JSONDecodeError):
1✔
346
            error = "JSON is invalid"
×
347

348
        except HTTPError:
1✔
349
            error = "An HTTP error occurred"
×
350

351
        except SSLError:
1✔
352
            error = "An SSL error occurred"
×
353

354
        except Timeout:
1✔
355
            error = "The request timed out"
×
356

357
        except (URLRequired, MissingSchema, InvalidSchema, InvalidURL):
1✔
358
            error = "The url is invalid"
1✔
359

360
        except TooManyRedirects:
×
361
            error = "There are too many redirects"
×
362

363
        except InvalidHeader:
×
364
            error = "The header is invalid"
×
365

366
        except:  # noqa E722
×
367
            error = "There was an unexpected error with the request"
×
368

369
        if error == "" and data.get("fail_when_4xx_response_code", True):
1✔
370
            try:
1✔
371
                response.raise_for_status()
1✔
372
            except Exception:
1✔
373
                error = response.text
1✔
374

375
        try:
1✔
376
            json_response = response.json()
1✔
377
            text_response = ""
1✔
378
        except:  # noqa E722
1✔
379
            json_response = {}
1✔
380
            if error:
1✔
381
                text_response = error
1✔
382
            else:
383
                text_response = response.text
×
384

385
        if hasattr(self, "tracker"):
1✔
386
            # TODO: JSON needs to be refactored
387
            try:
1✔
388
                json_payload = json.loads(self.clean_response(json_response))
1✔
389
            except NativeJSONDecodeError:
×
390
                json_payload = self.clean_response(json_response)
×
391

392
            try:
1✔
393
                json_post_payload = json.loads(
1✔
394
                    self.clean_response(self.cast_to_json(post_data))
395
                )
396
            except NativeJSONDecodeError:
×
397
                json_post_payload = self.clean_response(self.cast_to_json(post_data))
×
398

399
            try:
1✔
400
                json_headers_payload = json.loads(
1✔
401
                    self.clean_response(self.headers(data.get("headers", {})))
402
                )
403
            except NativeJSONDecodeError:
×
404
                json_headers_payload = self.clean_response(
×
405
                    self.headers(data.get("headers", {}))
406
                )
407

408
            IntegrationTrackerStep.objects.create(
1✔
409
                status_code=0 if response is None else response.status_code,
410
                tracker=self.tracker,
411
                json_response=json_payload,
412
                text_response=(
413
                    "Cannot display, could be file"
414
                    if data.get("save_as_file", False)
415
                    else self.clean_response(text_response)
416
                ),
417
                url=self.clean_response(url),
418
                method=data.get("method", "POST"),
419
                post_data=json_post_payload,
420
                headers=json_headers_payload,
421
                error=self.clean_response(error),
422
            )
423

424
        if error:
1✔
425
            return False, error
1✔
426

427
        return True, response
1✔
428

429
    def _replace_vars(self, text):
1✔
430
        params = {} if not hasattr(self, "params") else self.params
1✔
431
        if self.pk is not None:
1✔
432
            params["redirect_url"] = settings.BASE_URL + reverse_lazy(
1✔
433
                "integrations:oauth-callback", args=[self.id]
434
            )
435
        if hasattr(self, "new_hire") and self.new_hire is not None:
1✔
436
            text = self.new_hire.personalize(text, self.extra_args | params)
1✔
437
            return text
1✔
438
        t = Template(text)
1✔
439
        context = Context(self.extra_args | params)
1✔
440
        text = t.render(context)
1✔
441
        return text
1✔
442

443
    @property
1✔
444
    def has_oauth(self):
1✔
445
        return "oauth" in self.manifest
1✔
446

447
    def headers(self, headers=None):
1✔
448
        if headers is None:
1✔
449
            headers = {}
1✔
450

451
        headers = (
1✔
452
            self.manifest.get("headers", {}).items()
453
            if len(headers) == 0
454
            else headers.items()
455
        )
456
        new_headers = {}
1✔
457
        for key, value in headers:
1✔
458
            # If Basic authentication then swap to base64
459
            if key == "Authorization" and value.startswith("Basic"):
1✔
460
                auth_details = self._replace_vars(value.split(" ", 1)[1])
1✔
461
                value = "Basic " + base64.b64encode(
1✔
462
                    auth_details.encode("ascii")
463
                ).decode("ascii")
464

465
            # Adding an empty string to force to return a string instead of a
466
            # safestring. Ref: https://github.com/psf/requests/issues/6159
467
            new_headers[self._replace_vars(key) + ""] = self._replace_vars(value) + ""
1✔
468
        return new_headers
1✔
469

470
    def user_exists(self, new_hire):
1✔
471
        from users.models import IntegrationUser
1✔
472

473
        # check if user has been created manually
474
        if self.skip_user_provisioning:
1✔
475
            try:
1✔
476
                user_integration = IntegrationUser.objects.get(
1✔
477
                    user=new_hire, integration=self
478
                )
479
            except IntegrationUser.DoesNotExist:
1✔
480
                return False
1✔
481

482
            return not user_integration.revoked
1✔
483

484
        self.tracker = IntegrationTracker.objects.create(
1✔
485
            category=IntegrationTracker.Category.EXISTS,
486
            integration=self,
487
            for_user=new_hire,
488
        )
489

490
        self.new_hire = new_hire
1✔
491
        self.has_user_context = new_hire is not None
1✔
492

493
        # Renew token if necessary
494
        if not self.renew_key():
1✔
495
            return
×
496

497
        success, response = self.run_request(self.manifest["exists"])
1✔
498

499
        if not success:
1✔
500
            return None
1✔
501

502
        user_exists = (
1✔
503
            self._replace_vars(self.manifest["exists"]["expected"]) in response.text
504
        )
505

506
        IntegrationUser.objects.update_or_create(
1✔
507
            integration=self, user=new_hire, defaults={"revoked": not user_exists}
508
        )
509

510
        return user_exists
1✔
511

512
    def test_user_exists(self, new_hire):
1✔
513
        self.new_hire = new_hire
×
514
        self.has_user_context = new_hire is not None
×
515

516
        # Renew token if necessary
517
        if not self.renew_key():
×
518
            return _("Couldn't renew token")
×
519

520
        success, response = self.run_request(self.manifest["exists"])
×
521

522
        if isinstance(response, str):
×
523
            return _("Error when making the request: %(error)s") % {"error": response}
×
524

525
        user_exists = (
×
526
            self._replace_vars(self.manifest["exists"]["expected"]) in response.text
527
        )
528

529
        found_user = "FOUND USER" if user_exists else "COULD NOT FIND USER"
×
530

531
        return f"{found_user} in {response.text}"
×
532

533
    def needs_user_info(self, user):
1✔
534
        if self.skip_user_provisioning:
1✔
535
            return False
1✔
536

537
        # form created from the manifest, this info is always needed to create a new
538
        # account. Check if there is anything that needs to be filled
539
        form = self.manifest.get("form", [])
1✔
540

541
        # extra items that are needed from the integration (often prefilled by admin)
542
        extra_user_info = self.manifest.get("extra_user_info", [])
1✔
543
        needs_more_info = any(
1✔
544
            item["id"] not in user.extra_fields.keys() for item in extra_user_info
545
        )
546

547
        return len(form) > 0 or needs_more_info
1✔
548

549
    def revoke_user(self, user):
1✔
550
        if self.skip_user_provisioning:
1✔
551
            # should never be triggered
552
            return False, "Cannot revoke manual integration"
1✔
553

554
        self.new_hire = user
1✔
555
        self.has_user_context = True
1✔
556

557
        # Renew token if necessary
558
        if not self.renew_key():
1✔
559
            return False, "Couldn't renew key"
×
560

561
        revoke_manifest = self.manifest.get("revoke", [])
1✔
562

563
        # add extra fields directly to params
564
        self.params = self.new_hire.extra_fields
1✔
565
        self.tracker = IntegrationTracker.objects.create(
1✔
566
            category=IntegrationTracker.Category.REVOKE,
567
            integration=self if self.pk is not None else None,
568
            for_user=self.new_hire,
569
        )
570

571
        for item in revoke_manifest:
1✔
572
            success, response = self.run_request(item)
1✔
573

574
            if not success:
1✔
575
                return False, self.clean_response(response)
1✔
576

577
        return True, ""
1✔
578

579
    def renew_key(self):
1✔
580
        # Oauth2 refreshing access token if needed
581
        success = True
1✔
582
        if (
1✔
583
            self.has_oauth
584
            and "expires_in" in self.extra_args.get("oauth", {})
585
            and self.expiring < timezone.now()
586
        ):
587
            success, response = self.run_request(self.manifest["oauth"]["refresh"])
1✔
588

589
            if not success:
1✔
590
                user = self.new_hire if self.has_user_context else None
1✔
591
                Notification.objects.create(
1✔
592
                    notification_type=Notification.Type.FAILED_INTEGRATION,
593
                    extra_text=self.name,
594
                    created_for=user,
595
                    description="Refresh url: " + str(response),
596
                )
597
                return success
1✔
598

599
            self.extra_args["oauth"] |= response.json()
1✔
600
            if "expires_in" in response.json():
1✔
601
                self.expiring = timezone.now() + timedelta(
1✔
602
                    seconds=response.json()["expires_in"]
603
                )
604
            self.save(update_fields=["expiring", "extra_args"])
1✔
605
            if hasattr(self, "tracker"):
1✔
606
                # we need to clean the last step as we now probably got new secret keys
607
                # that need to be masked
608
                last_step = self.tracker.steps.last()
×
609
                last_step.json_response = self.clean_response(last_step.json_response)
×
610
                last_step.save()
×
611

612
        return success
1✔
613

614
    def _check_condition(self, response, condition):
1✔
615
        value = self._replace_vars(condition.get("value"))
1✔
616
        try:
1✔
617
            # first argument will be taken from the response
618
            response_value = get_value_from_notation(
1✔
619
                condition.get("response_notation"), response.json()
620
            )
621
        except KeyError:
×
622
            # we know that the result might not be in the response yet, as we are
623
            # waiting for the correct response, so just respond with an empty string
624
            response_value = ""
×
625
        return value == response_value
1✔
626

627
    def _polling(self, item, response):
1✔
628
        polling = item.get("polling")
1✔
629
        continue_if = item.get("continue_if")
1✔
630
        interval = polling.get("interval")
1✔
631
        amount = polling.get("amount")
1✔
632

633
        got_expected_result = self._check_condition(response, continue_if)
1✔
634
        if got_expected_result:
1✔
635
            return True, response
×
636

637
        tried = 1
1✔
638
        while amount > tried:
1✔
639
            time.sleep(interval)
1✔
640
            success, response = self.run_request(item)
1✔
641
            got_expected_result = self._check_condition(response, continue_if)
1✔
642
            if got_expected_result:
1✔
643
                return True, response
1✔
644
            tried += 1
1✔
645
        # if exceeding the max amounts, then fail
646
        return False, response
1✔
647

648
    def execute(self, new_hire=None, params=None, retry_on_failure=False):
1✔
649
        self.params = params or {}
1✔
650
        self.params["responses"] = []
1✔
651
        self.params["files"] = {}
1✔
652
        self.new_hire = new_hire
1✔
653
        self.has_user_context = new_hire is not None
1✔
654

655
        self.tracker = IntegrationTracker.objects.create(
1✔
656
            category=IntegrationTracker.Category.EXECUTE,
657
            integration=self if self.pk is not None else None,
658
            for_user=self.new_hire,
659
        )
660

661
        if self.has_user_context:
1✔
662
            self.params |= new_hire.extra_fields
1✔
663
            self.new_hire = new_hire
1✔
664

665
        # Renew token if necessary
666
        if not self.renew_key():
1✔
667
            return False, None
×
668

669
        # Add generated secrets
670
        for item in self.manifest.get("initial_data_form", []):
1✔
671
            if "name" in item and item["name"] == "generate":
1✔
672
                self.extra_args[item["id"]] = get_random_string(length=10)
1✔
673

674
        # Run all requests
675
        for item in self.manifest["execute"]:
1✔
676
            success, response = self.run_request(item)
1✔
677

678
            # check if we need to poll before continuing
679
            if polling := item.get("polling", False):
1✔
680
                success, response = self._polling(item, response)
1✔
681

682
            # check if we need to block this integration based on condition
683
            if continue_if := item.get("continue_if", False):
1✔
684
                got_expected_result = self._check_condition(response, continue_if)
1✔
685
                if not got_expected_result:
1✔
686
                    response = self.clean_response(response=response)
1✔
687
                    Notification.objects.create(
1✔
688
                        notification_type=Notification.Type.BLOCKED_INTEGRATION,
689
                        extra_text=self.name,
690
                        created_for=new_hire,
691
                        description=f"Execute url ({item['url']}): {response}",
692
                    )
693
                    return False, response
1✔
694

695
            # No need to retry or log when we are importing users
696
            if not success:
1✔
697
                if self.has_user_context:
1✔
698
                    response = self.clean_response(response=response)
1✔
699
                    if polling:
1✔
700
                        response = "Polling timed out: " + response
×
701
                    Notification.objects.create(
1✔
702
                        notification_type=Notification.Type.FAILED_INTEGRATION,
703
                        extra_text=self.name,
704
                        created_for=new_hire,
705
                        description=f"Execute url ({item['url']}): {response}",
706
                    )
707
                if retry_on_failure:
1✔
708
                    # Retry url in one hour
709
                    schedule(
1✔
710
                        "admin.integrations.tasks.retry_integration",
711
                        new_hire.id,
712
                        self.id,
713
                        params,
714
                        name=(
715
                            f"Retrying integration {self.id} for new hire {new_hire.id}"
716
                        ),
717
                        next_run=timezone.now() + timedelta(hours=1),
718
                        schedule_type=Schedule.ONCE,
719
                    )
720
                return False, response
1✔
721

722
            # save if file, so we can reuse later
723
            save_as_file = item.get("save_as_file")
1✔
724
            if save_as_file is not None:
1✔
725
                self.params["files"][save_as_file] = io.BytesIO(response.content)
1✔
726

727
            # save json response temporarily to be reused in other parts
728
            try:
1✔
729
                self.params["responses"].append(response.json())
1✔
730
            except:  # noqa E722
×
731
                # if we save a file, then just append an empty dict
732
                self.params["responses"].append({})
×
733

734
            # store data coming back from response to the user, so we can reuse in other
735
            # integrations
736
            if store_data := item.get("store_data", {}):
1✔
737
                for new_hire_prop, notation_for_response in store_data.items():
1✔
738
                    try:
1✔
739
                        value = get_value_from_notation(
1✔
740
                            notation_for_response, response.json()
741
                        )
742
                    except KeyError:
1✔
743
                        return (
1✔
744
                            False,
745
                            f"Could not store data to new hire: {notation_for_response}"
746
                            f" not found in {self.clean_response(response.json())}",
747
                        )
748

749
                    # save to new hire and to temp var `params` on this model for use in
750
                    # the same integration
751
                    new_hire.extra_fields[new_hire_prop] = value
1✔
752
                    self.params[new_hire_prop] = value
1✔
753
                new_hire.save()
1✔
754

755
        # Run all post requests (notifications)
756
        for item in self.manifest.get("post_execute_notification", []):
1✔
757
            if item["type"] == "email":
1✔
758
                send_email_with_notification(
1✔
759
                    subject=self._replace_vars(item["subject"]),
760
                    message=self._replace_vars(item["message"]),
761
                    to=self._replace_vars(item["to"]),
762
                    notification_type=(
763
                        Notification.Type.SENT_EMAIL_INTEGRATION_NOTIFICATION
764
                    ),
765
                )
766
                return True, None
1✔
767
            else:
768
                try:
×
769
                    client = Client(
×
770
                        settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN
771
                    )
772
                    client.messages.create(
×
773
                        to=new_hire.phone,
774
                        from_=settings.TWILIO_FROM_NUMBER,
775
                        body=self._replace_vars(item["message"]),
776
                    )
777
                except Exception:
×
778
                    Notification.objects.create(
×
779
                        notification_type=(
780
                            Notification.Type.FAILED_TEXT_INTEGRATION_NOTIFICATION
781
                        ),
782
                        extra_text=self.name,
783
                        created_for=new_hire,
784
                    )
785
                    return True, None
×
786

787
        # Succesfully ran integration, add notification only when we are provisioning
788
        # access
789
        if self.has_user_context:
1✔
790
            Notification.objects.create(
1✔
791
                notification_type=Notification.Type.RAN_INTEGRATION,
792
                extra_text=self.name,
793
                created_for=new_hire,
794
            )
795
        return True, response
1✔
796

797
    def config_form(self, data=None):
1✔
798
        if self.skip_user_provisioning:
1✔
799
            from .forms import ManualIntegrationConfigForm
1✔
800

801
            return ManualIntegrationConfigForm(data=data)
1✔
802

803
        from .forms import IntegrationConfigForm
1✔
804

805
        return IntegrationConfigForm(instance=self, data=data)
1✔
806

807
    def clean_response(self, response) -> str:
1✔
808
        if isinstance(response, dict):
1✔
809
            try:
1✔
810
                response = json.dumps(response)
1✔
811
            except ValueError:
×
812
                response = str(response)
×
813

814
        for name, value in self.extra_args.items():
1✔
815
            if isinstance(value, dict):
1✔
816
                for inner_name, inner_value in value.items():
×
817
                    response = response.replace(
×
818
                        str(inner_value),
819
                        _("***Secret value for %(name)s***")
820
                        % {"name": name + "." + inner_name},
821
                    )
822
            else:
823
                response = response.replace(
1✔
824
                    str(value), _("***Secret value for %(name)s***") % {"name": name}
825
                )
826

827
            if name == "Authorization" and value.startswith("Basic"):
1✔
828
                response.replace(
×
829
                    base64.b64encode(value.split(" ", 1)[1].encode("ascii")).decode(
830
                        "ascii"
831
                    ),
832
                    "BASE64 ENCODED SECRET",
833
                )
834

835
        return response
1✔
836

837
    objects = IntegrationManager()
1✔
838

839

840
@receiver(post_delete, sender=Integration)
1✔
841
def delete_schedule(sender, instance, **kwargs):
1✔
842
    Schedule.objects.filter(name=instance.schedule_name).delete()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc