• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 20162470757

12 Dec 2025 09:33AM UTC coverage: 91.069% (+0.004%) from 91.065%
20162470757

push

github

web-flow
Removed cleanup in analytics worker latest (#2294)

* removed cleanup

* property driven cleanup

* property driven cleanup

* commented tc for time being

* updated actor utility functions

* updated actor utility functions

3 of 3 new or added lines in 1 file covered. (100.0%)

18 existing lines in 4 files now uncovered.

30081 of 33031 relevant lines covered (91.07%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.58
/kairon/shared/pyscript/callback_pyscript_utils.py
1
import pickle
1✔
2
from calendar import timegm
1✔
3
from datetime import datetime, date
1✔
4
from email.mime.multipart import MIMEMultipart
1✔
5
from email.mime.text import MIMEText
1✔
6
from smtplib import SMTP
1✔
7
from typing import Text, Dict, Callable, List
1✔
8
import base64
1✔
9
from apscheduler.triggers.date import DateTrigger
1✔
10
from apscheduler.util import obj_to_ref, astimezone
1✔
11
from mongoengine import DoesNotExist
1✔
12
from pymongo import MongoClient
1✔
13
from tzlocal import get_localzone
1✔
14
from uuid6 import uuid7
1✔
15
from loguru import logger
1✔
16
from kairon import Utility
1✔
17
from kairon.events.executors.factory import ExecutorFactory
1✔
18
from kairon.exceptions import AppException
1✔
19
from kairon.shared.actions.data_objects import EmailActionConfig
1✔
20
from kairon.shared.actions.utils import ActionUtility
1✔
21
from bson import Binary
1✔
22
from types import ModuleType
1✔
23
from requests import Response
1✔
24
from cryptography.hazmat.primitives import hashes
1✔
25
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
1✔
26
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
1✔
27
from cryptography.hazmat.primitives.serialization import load_pem_private_key
1✔
28
from kairon.shared.callback.data_objects import CallbackConfig, CallbackData
1✔
29
import json as jsond
1✔
30

31
from kairon.shared.chat.user_media import UserMedia
1✔
32
from kairon.shared.cognition.data_objects import AnalyticsCollectionData
1✔
33

34

35
class CallbackScriptUtility:
1✔
36

37
    @staticmethod
1✔
38
    def generate_id():
1✔
39
        return uuid7().hex
1✔
40

41

42
    @staticmethod
1✔
43
    def datetime_to_utc_timestamp(timeval):
1✔
44
        """
45
        Converts a datetime instance to a timestamp.
46

47
        :type timeval: datetime
48
        :rtype: float
49

50
        """
51
        if timeval is not None:
1✔
52
            return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000
1✔
53

54

55
    @staticmethod
1✔
56
    def add_schedule_job(schedule_action: Text, date_time: datetime, data: Dict, timezone: Text, _id: Text = None,
1✔
57
                         bot: Text = None, kwargs=None):
58
        if not bot:
1✔
59
            raise AppException("Missing bot id")
1✔
60

61
        if not _id:
1✔
62
            _id = uuid7().hex
1✔
63

64
        if not data:
1✔
65
            data = {}
1✔
66

67
        data['bot'] = bot
1✔
68
        data['event'] = _id
1✔
69

70
        callback_config = CallbackConfig.get_entry(bot=bot, name=schedule_action)
1✔
71

72
        script = callback_config.get('pyscript_code')
1✔
73

74
        func = obj_to_ref(ExecutorFactory.get_executor_for_data(data).execute_task)
1✔
75

76
        schedule_data = {
1✔
77
            'source_code': script,
78
            'predefined_objects': data
79
        }
80

81
        args = (func, "scheduler_evaluator", schedule_data,)
1✔
82
        kwargs = {'task_type': "Callback"} if kwargs is None else {**kwargs, 'task_type': "Callback"}
1✔
83
        trigger = DateTrigger(run_date=date_time, timezone=timezone)
1✔
84

85
        next_run_time = trigger.get_next_fire_time(None, datetime.now(astimezone(timezone) or get_localzone()))
1✔
86

87
        job_kwargs = {
1✔
88
            'version': 1,
89
            'trigger': trigger,
90
            'executor': "default",
91
            'func': func,
92
            'args': tuple(args) if args is not None else (),
93
            'kwargs': kwargs,
94
            'id': _id,
95
            'name': "execute_task",
96
            'misfire_grace_time': 7200,
97
            'coalesce': True,
98
            'next_run_time': next_run_time,
99
            'max_instances': 1,
100
        }
101

102
        logger.info(job_kwargs)
1✔
103

104
        client = MongoClient(Utility.environment['database']['url'])
1✔
105
        events_db_name = Utility.environment["events"]["queue"]["name"]
1✔
106
        events_db = client.get_database(events_db_name)
1✔
107
        scheduler_collection = Utility.environment["events"]["scheduler"]["collection"]
1✔
108
        job_store_name = events_db.get_collection(scheduler_collection)
1✔
109
        event_server = Utility.environment['events']['server_url']
1✔
110

111
        job_store_name.insert_one({
1✔
112
            '_id': _id,
113
            'next_run_time': CallbackScriptUtility.datetime_to_utc_timestamp(next_run_time),
114
            'job_state': Binary(pickle.dumps(job_kwargs, pickle.HIGHEST_PROTOCOL))
115
        })
116

117
        http_response = ActionUtility.execute_http_request(
1✔
118
            f"{event_server}/api/events/dispatch/{_id}",
119
            "GET")
120

121
        if not http_response.get("success"):
1✔
122
            raise AppException(http_response)
1✔
123
        else:
124
            logger.info(http_response)
1✔
125

126
    @staticmethod
1✔
127
    def trigger_email(
1✔
128
                email: List[str],
129
                subject: str,
130
                body: str,
131
                smtp_url: str,
132
                smtp_port: int,
133
                sender_email: str,
134
                smtp_password: str,
135
                smtp_userid: str = None,
136
                tls: bool = False,
137
                content_type="html",
138
        ):
139
            """
140
            This is a sync email trigger.
141
            Sends an email to the mail id of the recipient
142

143
            :param smtp_userid:
144
            :param sender_email:
145
            :param tls:
146
            :param smtp_port:
147
            :param smtp_url:
148
            :param email: the mail id of the recipient
149
            :param smtp_password:
150
            :param subject: the subject of the mail
151
            :param body: the body of the mail
152
            :param content_type: "plain" or "html" content
153
            :return: None
154
            """
155
            smtp = None
1✔
156
            try:
1✔
157
                smtp = SMTP(smtp_url, port=smtp_port, timeout=10)
1✔
158
                smtp.connect(smtp_url, smtp_port)
1✔
159
                if tls:
1✔
160
                    smtp.starttls()
1✔
161
                smtp.login(smtp_userid if smtp_userid else sender_email, smtp_password)
1✔
162

163
                from_addr = sender_email
1✔
164
                mime_body = MIMEText(body, content_type)
1✔
165
                msg = MIMEMultipart("alternative")
1✔
166
                msg["Subject"] = subject
1✔
167
                msg["From"] = from_addr
1✔
168
                msg["To"] = ",".join(email)
1✔
169
                msg.attach(mime_body)
1✔
170

171
                smtp.sendmail(from_addr, email, msg.as_string())
1✔
172

173
            except Exception as e:
×
174
                print(f"Failed to send email: {e}")
×
UNCOV
175
                raise
×
176
            finally:
177
                if smtp:
1✔
178
                    try:
1✔
179
                        smtp.quit()
1✔
180
                    except Exception as quit_error:
×
UNCOV
181
                        print(f"Failed to quit SMTP connection cleanly: {quit_error}")
×
182

183

184
    @staticmethod
1✔
185
    def send_email(email_action: Text,
1✔
186
                   from_email: Text,
187
                   to_email: Text,
188
                   subject:  Text,
189
                   body: Text,
190
                   bot: Text):
191
        if not bot:
1✔
192
            raise AppException("Missing bot id")
1✔
193

194
        email_action_config = EmailActionConfig.objects(bot=bot, action_name=email_action).first()
1✔
195
        if not email_action_config:
1✔
196
            raise AppException(f"Email action '{email_action}' not configured for bot {bot}")
1✔
197
        action_config = email_action_config.to_mongo().to_dict()
1✔
198

199
        smtp_password = action_config.get('smtp_password').get("value")
1✔
200
        smtp_userid = action_config.get('smtp_userid').get("value")
1✔
201

202
        CallbackScriptUtility.trigger_email(
1✔
203
            email=[to_email],
204
            subject=subject,
205
            body=body,
206
            smtp_url=action_config['smtp_url'],
207
            smtp_port=action_config['smtp_port'],
208
            sender_email=from_email,
209
            smtp_password=smtp_password,
210
            smtp_userid=smtp_userid,
211
            tls=action_config['tls']
212
        )
213

214
    @staticmethod
1✔
215
    def perform_cleanup(local_vars: Dict):
1✔
216
        logger.info(f"local_vars: {local_vars}")
×
217
        filtered_locals = {}
×
218
        if local_vars:
×
219
            for key, value in local_vars.items():
×
220
                if not isinstance(value, Callable) and not isinstance(value, ModuleType):
×
221
                    if isinstance(value, datetime):
×
222
                        value = value.strftime("%m/%d/%Y, %H:%M:%S")
×
223
                    elif isinstance(value, date):
×
224
                        value = value.strftime("%Y-%m-%d")
×
225
                    elif isinstance(value, Response):
×
226
                        value = value.text
×
227
                    filtered_locals[key] = value
×
228
        logger.info(f"filtered_vars: {filtered_locals}")
×
UNCOV
229
        return filtered_locals
×
230

231

232
    @staticmethod
1✔
233
    def decrypt_request(request_body, private_key_pem):
1✔
234
        try:
1✔
235
            encrypted_data_b64 = request_body.get("encrypted_flow_data")
1✔
236
            encrypted_aes_key_b64 = request_body.get("encrypted_aes_key")
1✔
237
            iv_b64 = request_body.get("initial_vector")
1✔
238

239
            if not (encrypted_data_b64 and encrypted_aes_key_b64 and iv_b64):
1✔
240
                raise ValueError("Missing required encrypted data fields")
1✔
241

242
            # Decode base64 inputs
243
            encrypted_aes_key = base64.b64decode(encrypted_aes_key_b64)
1✔
244
            encrypted_data = base64.b64decode(encrypted_data_b64)
1✔
245
            iv = base64.b64decode(iv_b64)[:16]  # Ensure IV is exactly 16 bytes
1✔
246

247
            private_key = load_pem_private_key(private_key_pem.encode(), password=None)
1✔
248

249
            # Decrypt AES key using RSA and OAEP padding
250
            aes_key = private_key.decrypt(
1✔
251
                encrypted_aes_key,
252
                asym_padding.OAEP(
253
                    mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
254
                    algorithm=hashes.SHA256(),
255
                    label=None,
256
                ),
257
            )
258

259
            if len(aes_key) not in (16, 24, 32):
1✔
UNCOV
260
                raise ValueError(f"Invalid AES key size: {len(aes_key)} bytes")
×
261

262
            # Extract GCM tag (last 16 bytes)
263
            encrypted_body = encrypted_data[:-16]
1✔
264
            tag = encrypted_data[-16:]
1✔
265

266
            # Decrypt AES-GCM
267
            cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv, tag))
1✔
268
            decryptor = cipher.decryptor()
1✔
269
            decrypted_bytes = decryptor.update(encrypted_body) + decryptor.finalize()
1✔
270
            decrypted_data = jsond.loads(decrypted_bytes.decode("utf-8"))
1✔
271

272
            response_dict = {
1✔
273
                "decryptedBody": decrypted_data,
274
                "aesKeyBuffer": aes_key,
275
                "initialVectorBuffer": iv,
276
            }
277

278
            return response_dict
1✔
279

280
        except Exception as e:
1✔
281
            raise Exception(f"decryption failed-{str(e)}")
1✔
282

283

284
    @staticmethod
1✔
285
    def encrypt_response(response_body, aes_key_buffer, initial_vector_buffer):
1✔
286
        try:
1✔
287
            if aes_key_buffer is None:
1✔
288
                raise ValueError("AES key cannot be None")
1✔
289

290
            if initial_vector_buffer is None:
1✔
291
                raise ValueError("Initialization vector (IV) cannot be None")
1✔
292

293
            # Flip the IV
294
            flipped_iv = bytes(byte ^ 0xFF for byte in initial_vector_buffer)
1✔
295

296
            # Encrypt using AES-GCM
297
            encryptor = Cipher(algorithms.AES(aes_key_buffer), modes.GCM(flipped_iv)).encryptor()
1✔
298
            encrypted_bytes = encryptor.update(jsond.dumps(response_body).encode("utf-8")) + encryptor.finalize()
1✔
299
            encrypted_data_with_tag = encrypted_bytes + encryptor.tag
1✔
300

301
            # Encode result as base64
302
            encoded_data = base64.b64encode(encrypted_data_with_tag).decode("utf-8")
1✔
303
            return encoded_data
1✔
304
        except Exception as e:
1✔
305
            raise Exception(f"encryption failed-{str(e)}")
1✔
306

307

308
    @staticmethod
1✔
309
    def create_callback(callback_name: str, metadata: dict, bot: str, sender_id: str, channel: str,
1✔
310
                        name: str = None):
311
        if not callback_name:
1✔
312
            raise AppException("'callback name' must be provided and cannot be empty")
1✔
313

314
        if not name:
1✔
315
            name=callback_name
1✔
316
        callback_url, identifier, standalone = CallbackData.create_entry(
1✔
317
            name=name,
318
            callback_config_name=callback_name,
319
            bot=bot,
320
            sender_id=sender_id,
321
            channel=channel,
322
            metadata=metadata,
323
        )
324
        if standalone:
1✔
325
            return identifier
1✔
326
        else:
327
            return callback_url
1✔
328

329
    @staticmethod
1✔
330
    def save_as_pdf(text: str, bot: str, sender_id:str):
1✔
331
        try:
1✔
332
            _, media_id = UserMedia.save_markdown_as_pdf(
1✔
333
                bot=bot,
334
                sender_id=sender_id,
335
                text=text,
336
                filepath="report.pdf"
337
            )
338
            return media_id
1✔
339
        except Exception as e:
1✔
340
            raise Exception(f"encryption failed-{str(e)}")
1✔
341

342
    @staticmethod
1✔
343
    def get_data_analytics(collection_name: str, data_filters:dict, bot: str):
1✔
344
        if not bot:
1✔
345
            raise Exception("Missing bot id")
×
346

347
        normalized_name = collection_name.lower()
1✔
348
        match_filter = {
1✔
349
            "bot": bot,
350
            "collection_name": normalized_name
351
        }
352

353
        # Merge dictionary filter
354
        if data_filters:
1✔
355
            match_filter.update(data_filters)
1✔
356
        cursor = AnalyticsCollectionData._get_collection().aggregate([
1✔
357
            {"$match": match_filter},
358
            {"$project": {
359
                "_id": {"$toString": "$_id"},
360
                "collection_name": 1,
361
                "received_at": {
362
                "$dateToString": {
363
                    "format": "%Y-%m-%dT%H:%M:%S.%LZ",
364
                    "date": "$received_at"
365
                    }
366
                },
367
                "source": 1,
368
                "is_data_processed": 1,
369
                "data": 1
370
            }}
371
        ])
372

373
        return {"data": list(cursor)}
1✔
374

375
    @staticmethod
1✔
376
    def add_data_analytics(user: str, payload, bot: str = None):
1✔
377
        if not bot:
1✔
UNCOV
378
            raise Exception("Missing bot id")
×
379

380
        if not isinstance(payload, list):
1✔
381
            raise Exception("Payload must be a list of dicts")
1✔
382

383
        docs = []
1✔
384

385
        for item in payload:
1✔
386
            docs.append({
1✔
387
                "bot": bot,
388
                "user": user,
389
                "collection_name": item.get("collection_name", "").lower().strip(),
390
                "data": item.get("data"),
391
                "source": item.get("source", ""),
392
                "received_at": item.get("received_at") or datetime.utcnow(),
393
                "is_data_processed": False
394
            })
395

396
        AnalyticsCollectionData._get_collection().insert_many(docs)
1✔
397

398
        return {
1✔
399
            "message": "Records saved!"
400
        }
401

402
    @staticmethod
1✔
403
    def mark_as_processed(user: str, collection_name: str, bot: str = None):
1✔
404
        if not bot:
1✔
UNCOV
405
            raise Exception("Missing bot id")
×
406

407
        collection_name = collection_name.lower().strip()
1✔
408

409
        result = AnalyticsCollectionData.objects(
1✔
410
            bot=bot,
411
            collection_name=collection_name
412
        ).update(
413
            set__user=user,
414
            set__is_data_processed=True,
415
            multi=True
416
        )
417

418
        if result == 0:
1✔
419
            raise AppException("No records found for given bot and collection_name")
1✔
420

421
        return {
1✔
422
            "message": "Records updated!"
423
        }
424

425
    @staticmethod
1✔
426
    def delete_data_analytics(collection_id: str, bot: Text = None):
1✔
427
        if not bot:
1✔
UNCOV
428
            raise Exception("Missing bot id")
×
429

430
        try:
1✔
431
            AnalyticsCollectionData.objects(bot=bot, id=collection_id).delete()
1✔
432
        except DoesNotExist:
1✔
433
            raise AppException("Analytics Collection Data does not exists!")
1✔
434

435
        return {
1✔
436
            "message": f"Analytics Collection with ID {collection_id} has been successfully deleted.",
437
            "data": {"_id": collection_id}
438
        }
439

440
    @staticmethod
1✔
441
    def update_data_analytics(collection_id: str, user: str, payload: dict, bot: str = None):
1✔
442
        if not bot:
1✔
UNCOV
443
            raise Exception("Missing bot id")
×
444

445
        collection_name = payload.get("collection_name")
1✔
446
        data = payload.get("data", {})
1✔
447
        received_at=payload.get("received_at", datetime.utcnow())
1✔
448
        source=payload.get("source", "")
1✔
449
        is_data_processed=payload.get("is_data_processed", False)
1✔
450

451
        try:
1✔
452
            collection_obj = AnalyticsCollectionData.objects(bot=bot, id=collection_id, collection_name=collection_name).get()
1✔
453
            filtered_data = {
1✔
454
                k: v for k, v in data.items()
455
            }
456

457
            collection_obj.data.update(filtered_data)
1✔
458
            collection_obj.collection_name = collection_name
1✔
459
            collection_obj.user = user
1✔
460
            collection_obj.received_at = received_at
1✔
461
            collection_obj.source=source
1✔
462
            collection_obj.is_data_processed=is_data_processed
1✔
463
            collection_obj.save()
1✔
464
        except DoesNotExist:
1✔
465
            raise AppException("Analytics Collection Data with given id and collection_name not found!")
1✔
466

467
        return {
1✔
468
            "message": "Record updated!",
469
            "data": {"_id": collection_id}
470
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc