• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chiefonboarding / ChiefOnboarding / 6778659173

07 Nov 2023 01:14AM UTC coverage: 93.709% (+0.05%) from 93.66%
6778659173

Pull #383

github

web-flow
Merge b5553c900 into 5b93b8619
Pull Request #383: Add offboarding sequences

6271 of 6692 relevant lines covered (93.71%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.1
back/admin/integrations/models.py
1
import time
1✔
2
import base64
1✔
3
import io
1✔
4
import json
1✔
5
import uuid
1✔
6
from datetime import timedelta
1✔
7

8
import requests
1✔
9

10
from django.conf import settings
1✔
11
from django.core.exceptions import ValidationError
1✔
12
from django.db import models
1✔
13
from django.db.models.signals import post_delete
1✔
14
from django.dispatch import receiver
1✔
15
from django.template import Context, Template
1✔
16
from django.urls import reverse_lazy
1✔
17
from django.utils import timezone
1✔
18
from django.utils.crypto import get_random_string
1✔
19
from django.utils.translation import gettext_lazy as _
1✔
20
from django_q.models import Schedule
1✔
21
from django_q.tasks import schedule
1✔
22
from requests.exceptions import (
1✔
23
    HTTPError,
24
    InvalidHeader,
25
    InvalidJSONError,
26
    InvalidSchema,
27
    InvalidURL,
28
    JSONDecodeError,
29
    MissingSchema,
30
    SSLError,
31
    Timeout,
32
    TooManyRedirects,
33
    URLRequired,
34
)
35
from twilio.rest import Client
1✔
36

37
from admin.integrations.utils import get_value_from_notation
1✔
38
from admin.integrations.serializers import (
1✔
39
    WebhookManifestSerializer,
40
    SyncUsersManifestSerializer,
41
)
42
from misc.fernet_fields import EncryptedTextField
1✔
43
from misc.fields import EncryptedJSONField
1✔
44
from organization.models import Notification
1✔
45
from organization.utils import send_email_with_notification
1✔
46

47

48
class IntegrationManager(models.Manager):
1✔
49
    def get_queryset(self):
1✔
50
        return super().get_queryset()
1✔
51

52
    def sequence_integration_options(self):
1✔
53
        # any webhooks and account provisioning
54
        return self.get_queryset().filter(
1✔
55
            integration=Integration.Type.CUSTOM,
56
            manifest_type__in=[
57
                Integration.ManifestType.WEBHOOK,
58
                Integration.ManifestType.MANUAL_USER_PROVISIONING,
59
            ],
60
        )
61

62
    def account_provision_options(self):
1✔
63
        # only account provisioning (no general webhooks)
64
        return self.get_queryset().filter(
1✔
65
            integration=Integration.Type.CUSTOM,
66
            manifest_type=Integration.ManifestType.WEBHOOK,
67
            manifest__exists__isnull=False,
68
        ) | self.get_queryset().filter(
69
            integration=Integration.Type.CUSTOM,
70
            manifest_type=Integration.ManifestType.MANUAL_USER_PROVISIONING,
71
        )
72

73
    def import_users_options(self):
1✔
74
        # only import user items
75
        return (
1✔
76
            self.get_queryset()
77
            .filter(
78
                integration=Integration.Type.CUSTOM,
79
                manifest_type=Integration.ManifestType.SYNC_USERS,
80
            )
81
            .exclude(manifest__schedule__isnull=False)
82
        )
83

84

85
class Integration(models.Model):
1✔
86
    class Type(models.IntegerChoices):
1✔
87
        SLACK_BOT = 0, _("Slack bot")
1✔
88
        SLACK_ACCOUNT_CREATION = 1, _("Slack account creation")  # legacy
1✔
89
        GOOGLE_ACCOUNT_CREATION = 2, _("Google account creation")  # legacy
1✔
90
        GOOGLE_LOGIN = 3, _("Google Login")
1✔
91
        ASANA = 4, _("Asana")  # legacy
1✔
92
        CUSTOM = 10, _("Custom")
1✔
93

94
    class ManifestType(models.IntegerChoices):
1✔
95
        WEBHOOK = 0, _("Provision user accounts or trigger webhooks")
1✔
96
        SYNC_USERS = 1, _("Sync users")
1✔
97
        MANUAL_USER_PROVISIONING = 3, _(
1✔
98
            "Manual user account provisioning, no manifest required"
99
        )
100

101
    name = models.CharField(max_length=300, default="", blank=True)
1✔
102
    integration = models.IntegerField(choices=Type.choices)
1✔
103
    manifest_type = models.IntegerField(
1✔
104
        choices=ManifestType.choices, null=True, blank=True
105
    )
106
    token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
107
    refresh_token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
108
    base_url = models.CharField(max_length=22300, default="", blank=True)
1✔
109
    redirect_url = models.CharField(max_length=22300, default="", blank=True)
1✔
110
    account_id = models.CharField(max_length=22300, default="", blank=True)
1✔
111
    active = models.BooleanField(default=True)
1✔
112
    ttl = models.IntegerField(null=True, blank=True)
1✔
113
    expiring = models.DateTimeField(auto_now_add=True, blank=True)
1✔
114
    one_time_auth_code = models.UUIDField(
1✔
115
        default=uuid.uuid4, editable=False, unique=True
116
    )
117

118
    manifest = models.JSONField(default=dict, null=True, blank=True)
1✔
119
    extra_args = EncryptedJSONField(default=dict)
1✔
120
    enabled_oauth = models.BooleanField(default=False)
1✔
121

122
    # Slack
123
    app_id = models.CharField(max_length=100, default="")
1✔
124
    client_id = models.CharField(max_length=100, default="")
1✔
125
    client_secret = models.CharField(max_length=100, default="")
1✔
126
    signing_secret = models.CharField(max_length=100, default="")
1✔
127
    verification_token = models.CharField(max_length=100, default="")
1✔
128
    bot_token = EncryptedTextField(max_length=10000, default="", blank=True)
1✔
129
    bot_id = models.CharField(max_length=100, default="")
1✔
130

131
    @property
1✔
132
    def skip_user_provisioning(self):
1✔
133
        return self.manifest_type == Integration.ManifestType.MANUAL_USER_PROVISIONING
1✔
134

135
    @property
1✔
136
    def schedule_name(self):
1✔
137
        return f"User sync for integration: {self.id}"
1✔
138

139
    def clean(self):
1✔
140
        if not self.manifest or self.skip_user_provisioning:
1✔
141
            # ignore field if form doesn't have it or no manifest is necessary
142
            return
1✔
143

144
        if self.manifest_type == Integration.ManifestType.WEBHOOK:
1✔
145
            manifest_serializer = WebhookManifestSerializer(data=self.manifest)
1✔
146
        else:
147
            manifest_serializer = SyncUsersManifestSerializer(data=self.manifest)
1✔
148
        if not manifest_serializer.is_valid():
1✔
149
            raise ValidationError({"manifest": json.dumps(manifest_serializer.errors)})
×
150

151
    def save(self, *args, **kwargs):
1✔
152
        super().save(*args, **kwargs)
1✔
153

154
        # skip if it's not a sync user integration (no background jobs for the others)
155
        if self.manifest_type != Integration.ManifestType.SYNC_USERS:
1✔
156
            return
1✔
157

158
        # update the background job based on the manifest
159
        schedule_cron = self.manifest.get("schedule")
1✔
160

161
        try:
1✔
162
            schedule_obj = Schedule.objects.get(name=self.schedule_name)
1✔
163
        except Schedule.DoesNotExist:
1✔
164
            # Schedule does not exist yet, so create it if specified
165
            if schedule_cron:
1✔
166
                schedule(
1✔
167
                    "admin.integrations.tasks.sync_user_info",
168
                    self.id,
169
                    schedule_type=Schedule.CRON,
170
                    cron=schedule_cron,
171
                    name=self.schedule_name,
172
                )
173
            return
1✔
174

175
        # delete if cron was removed
176
        if schedule_cron is None:
1✔
177
            schedule_obj.delete()
1✔
178
            return
1✔
179

180
        # if schedule changed, then update
181
        if schedule_obj.cron != schedule_cron:
×
182
            schedule_obj.cron = schedule_cron
×
183
            schedule_obj.save()
×
184

185
    def register_manual_integration_run(self, user):
1✔
186
        from users.models import IntegrationUser
1✔
187

188
        integration_user, created = IntegrationUser.objects.update_or_create(
1✔
189
            user=user,
190
            integration=self,
191
            defaults={"revoked": user.termination_date is not None}
192
        )
193

194
    def run_request(self, data):
1✔
195
        url = self._replace_vars(data["url"])
1✔
196
        if "data" in data:
1✔
197
            post_data = self._replace_vars(json.dumps(data["data"]))
1✔
198
        else:
199
            post_data = {}
1✔
200
        if data.get("cast_data_to_json", False):
1✔
201
            try:
×
202
                post_data = json.loads(post_data)
×
203
            except Exception:
×
204
                pass
×
205

206
        # extract files from locally saved files and send them with the request
207
        files_to_send = {}
1✔
208
        for field_name, file_name in data.get("files", {}).items():
1✔
209
            try:
1✔
210
                files_to_send[field_name] = (file_name, self.params["files"][file_name])
1✔
211
            except KeyError:
1✔
212
                return (
1✔
213
                    False,
214
                    f"{file_name} could not be found in the locally saved files",
215
                )
216

217
        try:
1✔
218
            response = requests.request(
1✔
219
                data.get("method", "POST"),
220
                url,
221
                headers=self.headers(data.get("headers", {})),
222
                data=post_data,
223
                files=files_to_send,
224
                timeout=120,
225
            )
226
        except (InvalidJSONError, JSONDecodeError):
1✔
227
            return False, "JSON is invalid"
×
228

229
        except HTTPError:
1✔
230
            return False, "An HTTP error occurred"
×
231

232
        except SSLError:
1✔
233
            return False, "An SSL error occurred"
×
234

235
        except Timeout:
1✔
236
            return False, "The request timed out"
×
237

238
        except (URLRequired, MissingSchema, InvalidSchema, InvalidURL):
1✔
239
            return False, "The url is invalid"
1✔
240

241
        except TooManyRedirects:
×
242
            return False, "There are too many redirects"
×
243

244
        except InvalidHeader:
×
245
            return False, "The header is invalid"
×
246

247
        except:  # noqa E722
×
248
            return False, "There was an unexpected error with the request"
×
249

250
        if data.get("fail_when_4xx_response_code", True):
1✔
251
            try:
1✔
252
                response.raise_for_status()
1✔
253
            except Exception:
1✔
254
                return False, response.text
1✔
255

256
        return True, response
1✔
257

258
    def _replace_vars(self, text):
1✔
259
        params = {} if not hasattr(self, "params") else self.params
1✔
260
        params["redirect_url"] = settings.BASE_URL + reverse_lazy(
1✔
261
            "integrations:oauth-callback", args=[self.id]
262
        )
263
        if hasattr(self, "new_hire") and self.new_hire is not None:
1✔
264
            text = self.new_hire.personalize(text, self.extra_args | params)
1✔
265
            return text
1✔
266
        t = Template(text)
1✔
267
        context = Context(self.extra_args | params)
1✔
268
        text = t.render(context)
1✔
269
        return text
1✔
270

271
    @property
1✔
272
    def has_oauth(self):
1✔
273
        return "oauth" in self.manifest
1✔
274

275
    def headers(self, headers=None):
1✔
276
        if headers is None:
1✔
277
            headers = {}
1✔
278

279
        headers = (
1✔
280
            self.manifest.get("headers", {}).items()
281
            if len(headers) == 0
282
            else headers.items()
283
        )
284
        new_headers = {}
1✔
285
        for key, value in headers:
1✔
286
            # If Basic authentication then swap to base64
287
            if key == "Authorization" and value.startswith("Basic"):
1✔
288
                auth_details = self._replace_vars(value.split(" ", 1)[1])
1✔
289
                value = "Basic " + base64.b64encode(
1✔
290
                    auth_details.encode("ascii")
291
                ).decode("ascii")
292

293
            # Adding an empty string to force to return a string instead of a
294
            # safestring. Ref: https://github.com/psf/requests/issues/6159
295
            new_headers[self._replace_vars(key) + ""] = self._replace_vars(value) + ""
1✔
296
        return new_headers
1✔
297

298
    def user_exists(self, new_hire):
1✔
299
        # check if user has been created manually
300
        if self.skip_user_provisioning:
1✔
301
            from users.models import IntegrationUser
1✔
302

303
            try:
1✔
304
                user_integration = IntegrationUser.objects.get(
1✔
305
                    user=new_hire, integration=self
306
                )
307
            except IntegrationUser.DoesNotExist:
1✔
308
                return False
1✔
309

310
            return not user_integration.revoked
1✔
311

312
        self.new_hire = new_hire
1✔
313
        self.has_user_context = new_hire is not None
1✔
314

315
        # Renew token if necessary
316
        if not self.renew_key():
1✔
317
            return
×
318

319
        success, response = self.run_request(self.manifest["exists"])
1✔
320

321
        if not success:
1✔
322
            return None
1✔
323

324
        return self._replace_vars(self.manifest["exists"]["expected"]) in response.text
1✔
325

326
    def needs_user_info(self, user):
1✔
327
        if self.skip_user_provisioning:
1✔
328
            return False
1✔
329

330
        # form created from the manifest, this info is always needed to create a new
331
        # account. Check if there is anything that needs to be filled
332
        form = self.manifest.get("form", [])
1✔
333

334
        # extra items that are needed from the integration (often prefilled by admin)
335
        extra_user_info = self.manifest.get("extra_user_info", [])
1✔
336
        needs_more_info = any(
1✔
337
            item["id"] not in user.extra_fields.keys() for item in extra_user_info
338
        )
339

340
        return len(form) > 0 or needs_more_info
1✔
341

342
    def revoke_user(self, user):
1✔
343
        if self.skip_user_provisioning:
1✔
344
            # should never be triggered
345
            return False, "Cannot revoke manual integration"
1✔
346

347
        self.new_hire = user
1✔
348
        self.has_user_context = True
1✔
349

350
        # Renew token if necessary
351
        if not self.renew_key():
1✔
352
            return False, "Couldn't renew key"
×
353

354
        revoke_manifest = self.manifest.get("revoke", [])
1✔
355

356
        # add extra fields directly to params
357
        self.params = self.new_hire.extra_fields
1✔
358

359
        for item in revoke_manifest:
1✔
360
            success, response = self.run_request(item)
1✔
361

362
            if not success:
1✔
363
                return False, self.clean_response(response)
1✔
364

365
        return True, ""
1✔
366

367
    def renew_key(self):
1✔
368
        # Oauth2 refreshing access token if needed
369
        success = True
1✔
370
        if (
1✔
371
            self.has_oauth
372
            and "expires_in" in self.extra_args.get("oauth", {})
373
            and self.expiring < timezone.now()
374
        ):
375
            success, response = self.run_request(self.manifest["oauth"]["refresh"])
1✔
376

377
            if not success:
1✔
378
                user = self.new_hire if self.has_user_context else None
1✔
379
                Notification.objects.create(
1✔
380
                    notification_type=Notification.Type.FAILED_INTEGRATION,
381
                    extra_text=self.name,
382
                    created_for=user,
383
                    description="Refresh url: " + str(response),
384
                )
385
                return success
1✔
386

387
            self.extra_args["oauth"] |= response.json()
1✔
388
            if "expires_in" in response.json():
1✔
389
                self.expiring = timezone.now() + timedelta(
1✔
390
                    seconds=response.json()["expires_in"]
391
                )
392
            self.save(update_fields=["expiring", "extra_args"])
1✔
393
        return success
1✔
394

395
    def _check_condition(self, response, condition):
1✔
396
        value = self._replace_vars(condition.get("value"))
1✔
397
        try:
1✔
398
            # first argument will be taken from the response
399
            response_value = get_value_from_notation(
1✔
400
                condition.get("response_notation"), response.json()
401
            )
402
        except KeyError:
×
403
            # we know that the result might not be in the response yet, as we are
404
            # waiting for the correct response, so just respond with an empty string
405
            response_value = ""
×
406
        return value == response_value
1✔
407

408
    def _polling(self, item, response):
1✔
409
        polling = item.get("polling")
1✔
410
        continue_if = item.get("continue_if")
1✔
411
        interval = polling.get("interval")
1✔
412
        amount = polling.get("amount")
1✔
413

414
        got_expected_result = self._check_condition(response, continue_if)
1✔
415
        if got_expected_result:
1✔
416
            return True, response
×
417

418
        tried = 1
1✔
419
        while amount > tried:
1✔
420
            time.sleep(interval)
1✔
421
            success, response = self.run_request(item)
1✔
422
            got_expected_result = self._check_condition(response, continue_if)
1✔
423
            if got_expected_result:
1✔
424
                return True, response
1✔
425
            tried += 1
1✔
426
        # if exceeding the max amounts, then fail
427
        return False, response
1✔
428

429
    def execute(self, new_hire=None, params=None):
1✔
430
        self.params = params or {}
1✔
431
        self.params["responses"] = []
1✔
432
        self.params["files"] = {}
1✔
433
        self.new_hire = new_hire
1✔
434
        self.has_user_context = new_hire is not None
1✔
435

436
        if self.has_user_context:
1✔
437
            self.params |= new_hire.extra_fields
1✔
438
            self.new_hire = new_hire
1✔
439

440
        # Renew token if necessary
441
        if not self.renew_key():
1✔
442
            return False, None
×
443

444
        # Add generated secrets
445
        for item in self.manifest.get("initial_data_form", []):
1✔
446
            if "name" in item and item["name"] == "generate":
1✔
447
                self.extra_args[item["id"]] = get_random_string(length=10)
1✔
448

449
        # Run all requests
450
        for item in self.manifest["execute"]:
1✔
451
            success, response = self.run_request(item)
1✔
452

453
            # check if we need to poll before continuing
454
            if polling := item.get("polling", False):
1✔
455
                success, response = self._polling(item, response)
1✔
456

457
            # check if we need to block this integration based on condition
458
            if continue_if := item.get("continue_if", False):
1✔
459
                got_expected_result = self._check_condition(response, continue_if)
1✔
460
                if not got_expected_result:
1✔
461
                    response = self.clean_response(response=response)
1✔
462
                    Notification.objects.create(
1✔
463
                        notification_type=Notification.Type.BLOCKED_INTEGRATION,
464
                        extra_text=self.name,
465
                        created_for=new_hire,
466
                        description=f"Execute url ({item['url']}): {response}",
467
                    )
468
                    return False, response
1✔
469

470
            # No need to retry or log when we are importing users
471
            if not success:
1✔
472
                if self.has_user_context:
1✔
473
                    response = self.clean_response(response=response)
1✔
474
                    if polling:
1✔
475
                        response = "Polling timed out: " + response
×
476
                    Notification.objects.create(
1✔
477
                        notification_type=Notification.Type.FAILED_INTEGRATION,
478
                        extra_text=self.name,
479
                        created_for=new_hire,
480
                        description=f"Execute url ({item['url']}): {response}",
481
                    )
482
                # Retry url in one hour
483
                try:
1✔
484
                    schedule(
1✔
485
                        "admin.integrations.tasks.retry_integration",
486
                        new_hire.id,
487
                        self.id,
488
                        params,
489
                        name=(
490
                            f"Retrying integration {self.id} for new hire {new_hire.id}"
491
                        ),
492
                        next_run=timezone.now() + timedelta(hours=1),
493
                        schedule_type=Schedule.ONCE,
494
                    )
495
                except:  # noqa E722
×
496
                    # Only errors when item gets added another time, so we can safely
497
                    # let it pass.
498
                    pass
×
499
                return False, response
1✔
500

501
            # save if file, so we can reuse later
502
            save_as_file = item.get("save_as_file")
1✔
503
            if save_as_file is not None:
1✔
504
                self.params["files"][save_as_file] = io.BytesIO(response.content)
1✔
505

506
            # save json response temporarily to be reused in other parts
507
            try:
1✔
508
                self.params["responses"].append(response.json())
1✔
509
            except:  # noqa E722
×
510
                # if we save a file, then just append an empty dict
511
                self.params["responses"].append({})
×
512

513
            # store data coming back from response to the user, so we can reuse in other
514
            # integrations
515
            if store_data := item.get("store_data", {}):
1✔
516
                for new_hire_prop, notation_for_response in store_data.items():
1✔
517
                    try:
1✔
518
                        value = get_value_from_notation(
1✔
519
                            notation_for_response, response.json()
520
                        )
521
                    except KeyError:
1✔
522
                        return (
1✔
523
                            False,
524
                            f"Could not store data to new hire: {notation_for_response}"
525
                            f" not found in {self.clean_response(response.json())}",
526
                        )
527

528
                    # save to new hire and to temp var `params` on this model for use in
529
                    # the same integration
530
                    new_hire.extra_fields[new_hire_prop] = value
1✔
531
                    self.params[new_hire_prop] = value
1✔
532
                new_hire.save()
1✔
533

534
        # Run all post requests (notifications)
535
        for item in self.manifest.get("post_execute_notification", []):
1✔
536
            if item["type"] == "email":
1✔
537
                send_email_with_notification(
1✔
538
                    subject=self._replace_vars(item["subject"]),
539
                    message=self._replace_vars(item["message"]),
540
                    to=self._replace_vars(item["to"]),
541
                    notification_type=(
542
                        Notification.Type.SENT_EMAIL_INTEGRATION_NOTIFICATION
543
                    ),
544
                )
545
                return True, None
1✔
546
            else:
547
                try:
×
548
                    client = Client(
×
549
                        settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN
550
                    )
551
                    client.messages.create(
×
552
                        to=new_hire.phone,
553
                        from_=settings.TWILIO_FROM_NUMBER,
554
                        body=self._replace_vars(item["message"]),
555
                    )
556
                except Exception:
×
557
                    Notification.objects.create(
×
558
                        notification_type=(
559
                            Notification.Type.FAILED_TEXT_INTEGRATION_NOTIFICATION
560
                        ),
561
                        extra_text=self.name,
562
                        created_for=new_hire,
563
                    )
564
                    return True, None
×
565

566
        # Succesfully ran integration, add notification only when we are provisioning
567
        # access
568
        if self.has_user_context:
1✔
569
            Notification.objects.create(
1✔
570
                notification_type=Notification.Type.RAN_INTEGRATION,
571
                extra_text=self.name,
572
                created_for=new_hire,
573
            )
574
        return True, response
1✔
575

576
    def config_form(self, data=None):
1✔
577
        if self.skip_user_provisioning:
1✔
578
            from .forms import ManualIntegrationConfigForm
1✔
579

580
            return ManualIntegrationConfigForm(data=data)
1✔
581

582
        from .forms import IntegrationConfigForm
1✔
583

584
        return IntegrationConfigForm(instance=self, data=data)
1✔
585

586
    def clean_response(self, response):
1✔
587
        # if json, then convert to string to make it easier to replace values
588
        response = str(response)
1✔
589
        for name, value in self.extra_args.items():
1✔
590
            response = response.replace(
1✔
591
                str(value), _("***Secret value for %(name)s***") % {"name": name}
592
            )
593

594
        return response
1✔
595

596
    objects = IntegrationManager()
1✔
597

598

599
@receiver(post_delete, sender=Integration)
1✔
600
def delete_schedule(sender, instance, **kwargs):
1✔
601
    Schedule.objects.filter(name=instance.schedule_name).delete()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc