• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 14663284993

25 Apr 2025 11:13AM UTC coverage: 90.38% (-0.02%) from 90.399%
14663284993

Pull #1933

github

web-flow
Merge a2cf12feb into b67f5642f
Pull Request #1933: move main pyscript execution to callback server and added and fixed test cases for the same

230 of 288 new or added lines in 8 files covered. (79.86%)

87 existing lines in 5 files now uncovered.

25995 of 28762 relevant lines covered (90.38%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.05
/kairon/shared/pyscript/callback_pyscript_utils.py
1
import pickle
1✔
2
from calendar import timegm
1✔
3
from datetime import datetime, date
1✔
4
from email.mime.multipart import MIMEMultipart
1✔
5
from email.mime.text import MIMEText
1✔
6
from smtplib import SMTP
1✔
7
from typing import Text, Dict, Callable, List
1✔
8
import base64
1✔
9
from apscheduler.triggers.date import DateTrigger
1✔
10
from apscheduler.util import obj_to_ref, astimezone
1✔
11
from pymongo import MongoClient
1✔
12
from tzlocal import get_localzone
1✔
13
from uuid6 import uuid7
1✔
14
from loguru import logger
1✔
15
from kairon import Utility
1✔
16
from kairon.events.executors.factory import ExecutorFactory
1✔
17
from kairon.exceptions import AppException
1✔
18
from kairon.shared.actions.data_objects import EmailActionConfig
1✔
19
from kairon.shared.actions.utils import ActionUtility
1✔
20
from bson import Binary
1✔
21
from types import ModuleType
1✔
22
from requests import Response
1✔
23
from cryptography.hazmat.primitives import hashes
1✔
24
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
1✔
25
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
1✔
26
from cryptography.hazmat.primitives.serialization import load_pem_private_key
1✔
27
from kairon.shared.callback.data_objects import CallbackConfig, CallbackData
1✔
28
import json as jsond
1✔
29

30
from kairon.shared.chat.user_media import UserMedia
1✔
31

32

33
class CallbackScriptUility:
1✔
34

35
    @staticmethod
1✔
36
    def generate_id():
1✔
37
        return uuid7().hex
1✔
38

39

40
    @staticmethod
1✔
41
    def datetime_to_utc_timestamp(timeval):
1✔
42
        """
43
        Converts a datetime instance to a timestamp.
44

45
        :type timeval: datetime
46
        :rtype: float
47

48
        """
49
        if timeval is not None:
1✔
50
            return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000
1✔
51

52

53
    @staticmethod
1✔
54
    def add_schedule_job(schedule_action: Text, date_time: datetime, data: Dict, timezone: Text, _id: Text = None,
1✔
55
                         bot: Text = None, kwargs=None):
56
        if not bot:
1✔
57
            raise AppException("Missing bot id")
1✔
58

59
        if not _id:
1✔
60
            _id = uuid7().hex
1✔
61

62
        if not data:
1✔
63
            data = {}
1✔
64

65
        data['bot'] = bot
1✔
66
        data['event'] = _id
1✔
67

68
        callback_config = CallbackConfig.get_entry(bot=bot, name=schedule_action)
1✔
69

70
        script = callback_config.get('pyscript_code')
1✔
71

72
        func = obj_to_ref(ExecutorFactory.get_executor().execute_task)
1✔
73

74
        schedule_data = {
1✔
75
            'source_code': script,
76
            'predefined_objects': data
77
        }
78

79
        args = (func, "scheduler_evaluator", schedule_data,)
1✔
80
        kwargs = {'task_type': "Callback"} if kwargs is None else {**kwargs, 'task_type': "Callback"}
1✔
81
        trigger = DateTrigger(run_date=date_time, timezone=timezone)
1✔
82

83
        next_run_time = trigger.get_next_fire_time(None, datetime.now(astimezone(timezone) or get_localzone()))
1✔
84

85
        job_kwargs = {
1✔
86
            'version': 1,
87
            'trigger': trigger,
88
            'executor': "default",
89
            'func': func,
90
            'args': tuple(args) if args is not None else (),
91
            'kwargs': kwargs,
92
            'id': _id,
93
            'name': "execute_task",
94
            'misfire_grace_time': 7200,
95
            'coalesce': True,
96
            'next_run_time': next_run_time,
97
            'max_instances': 1,
98
        }
99

100
        logger.info(job_kwargs)
1✔
101

102
        client = MongoClient(Utility.environment['database']['url'])
1✔
103
        events_db_name = Utility.environment["events"]["queue"]["name"]
1✔
104
        events_db = client.get_database(events_db_name)
1✔
105
        scheduler_collection = Utility.environment["events"]["scheduler"]["collection"]
1✔
106
        job_store_name = events_db.get_collection(scheduler_collection)
1✔
107
        event_server = Utility.environment['events']['server_url']
1✔
108

109
        job_store_name.insert_one({
1✔
110
            '_id': _id,
111
            'next_run_time': CallbackScriptUility.datetime_to_utc_timestamp(next_run_time),
112
            'job_state': Binary(pickle.dumps(job_kwargs, pickle.HIGHEST_PROTOCOL))
113
        })
114

115
        http_response = ActionUtility.execute_http_request(
1✔
116
            f"{event_server}/api/events/dispatch/{_id}",
117
            "GET")
118

119
        if not http_response.get("success"):
1✔
120
            raise AppException(http_response)
1✔
121
        else:
122
            logger.info(http_response)
1✔
123

124
    @staticmethod
1✔
125
    def trigger_email(
1✔
126
                email: List[str],
127
                subject: str,
128
                body: str,
129
                smtp_url: str,
130
                smtp_port: int,
131
                sender_email: str,
132
                smtp_password: str,
133
                smtp_userid: str = None,
134
                tls: bool = False,
135
                content_type="html",
136
        ):
137
            """
138
            This is a sync email trigger.
139
            Sends an email to the mail id of the recipient
140

141
            :param smtp_userid:
142
            :param sender_email:
143
            :param tls:
144
            :param smtp_port:
145
            :param smtp_url:
146
            :param email: the mail id of the recipient
147
            :param smtp_password:
148
            :param subject: the subject of the mail
149
            :param body: the body of the mail
150
            :param content_type: "plain" or "html" content
151
            :return: None
152
            """
153
            smtp = SMTP(smtp_url, port=smtp_port, timeout=10)
1✔
154
            smtp.connect(smtp_url, smtp_port)
1✔
155
            if tls:
1✔
156
                smtp.starttls()
1✔
157
            smtp.login(smtp_userid if smtp_userid else sender_email, smtp_password)
1✔
158
            from_addr = sender_email
1✔
159
            body = MIMEText(body, content_type)
1✔
160
            msg = MIMEMultipart("alternative")
1✔
161
            msg["Subject"] = subject
1✔
162
            msg["From"] = from_addr
1✔
163
            msg["To"] = ",".join(email)
1✔
164
            msg.attach(body)
1✔
165
            smtp.sendmail(from_addr, email, msg.as_string())
1✔
166
            smtp.quit()
1✔
167

168

169
    @staticmethod
1✔
170
    def send_email(email_action: Text,
1✔
171
                   from_email: Text,
172
                   to_email: Text,
173
                   subject:  Text,
174
                   body: Text,
175
                   bot: Text):
176
        if not bot:
1✔
177
            raise AppException("Missing bot id")
1✔
178

179
        email_action_config = EmailActionConfig.objects(bot=bot, action_name=email_action).first()
1✔
180
        action_config = email_action_config.to_mongo().to_dict()
1✔
181

182
        smtp_password = action_config.get('smtp_password').get("value")
1✔
183
        smtp_userid = action_config.get('smtp_userid').get("value")
1✔
184

185
        CallbackScriptUility.trigger_email(
1✔
186
            email=[to_email],
187
            subject=subject,
188
            body=body,
189
            smtp_url=action_config['smtp_url'],
190
            smtp_port=action_config['smtp_port'],
191
            sender_email=from_email,
192
            smtp_password=smtp_password,
193
            smtp_userid=smtp_userid,
194
            tls=action_config['tls']
195
        )
196

197
    @staticmethod
1✔
198
    def perform_cleanup(local_vars: Dict):
1✔
NEW
199
        logger.info(f"local_vars: {local_vars}")
×
NEW
200
        filtered_locals = {}
×
NEW
201
        if local_vars:
×
NEW
202
            for key, value in local_vars.items():
×
NEW
203
                if not isinstance(value, Callable) and not isinstance(value, ModuleType):
×
NEW
204
                    if isinstance(value, datetime):
×
NEW
205
                        value = value.strftime("%m/%d/%Y, %H:%M:%S")
×
NEW
206
                    elif isinstance(value, date):
×
NEW
207
                        value = value.strftime("%Y-%m-%d")
×
NEW
208
                    elif isinstance(value, Response):
×
NEW
209
                        value = value.text
×
NEW
210
                    filtered_locals[key] = value
×
NEW
211
        logger.info(f"filtered_vars: {filtered_locals}")
×
NEW
212
        return filtered_locals
×
213

214

215
    @staticmethod
1✔
216
    def decrypt_request(request_body, private_key_pem):
1✔
NEW
217
        try:
×
NEW
218
            encrypted_data_b64 = request_body.get("encrypted_flow_data")
×
NEW
219
            encrypted_aes_key_b64 = request_body.get("encrypted_aes_key")
×
NEW
220
            iv_b64 = request_body.get("initial_vector")
×
221

NEW
222
            if not (encrypted_data_b64 and encrypted_aes_key_b64 and iv_b64):
×
NEW
223
                raise ValueError("Missing required encrypted data fields")
×
224

225
            # Decode base64 inputs
NEW
226
            encrypted_aes_key = base64.b64decode(encrypted_aes_key_b64)
×
NEW
227
            encrypted_data = base64.b64decode(encrypted_data_b64)
×
NEW
228
            iv = base64.b64decode(iv_b64)[:16]  # Ensure IV is exactly 16 bytes
×
229

NEW
230
            private_key = load_pem_private_key(private_key_pem.encode(), password=None)
×
231

232
            # Decrypt AES key using RSA and OAEP padding
NEW
233
            aes_key = private_key.decrypt(
×
234
                encrypted_aes_key,
235
                asym_padding.OAEP(
236
                    mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
237
                    algorithm=hashes.SHA256(),
238
                    label=None,
239
                ),
240
            )
241

NEW
242
            if len(aes_key) not in (16, 24, 32):
×
NEW
243
                raise ValueError(f"Invalid AES key size: {len(aes_key)} bytes")
×
244

245
            # Extract GCM tag (last 16 bytes)
NEW
246
            encrypted_body = encrypted_data[:-16]
×
NEW
247
            tag = encrypted_data[-16:]
×
248

249
            # Decrypt AES-GCM
NEW
250
            cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv, tag))
×
NEW
251
            decryptor = cipher.decryptor()
×
NEW
252
            decrypted_bytes = decryptor.update(encrypted_body) + decryptor.finalize()
×
NEW
253
            decrypted_data = jsond.loads(decrypted_bytes.decode("utf-8"))
×
254

NEW
255
            response_dict = {
×
256
                "decryptedBody": decrypted_data,
257
                "aesKeyBuffer": aes_key,
258
                "initialVectorBuffer": iv,
259
            }
260

NEW
261
            return response_dict
×
262

NEW
263
        except Exception as e:
×
NEW
264
            raise Exception(f"decryption failed-{str(e)}")
×
265

266

267
    @staticmethod
1✔
268
    def encrypt_response(response_body, aes_key_buffer, initial_vector_buffer):
1✔
NEW
269
        try:
×
NEW
270
            if aes_key_buffer is None:
×
NEW
271
                raise ValueError("AES key cannot be None")
×
272

NEW
273
            if initial_vector_buffer is None:
×
NEW
274
                raise ValueError("Initialization vector (IV) cannot be None")
×
275

276
            # Flip the IV
NEW
277
            flipped_iv = bytes(byte ^ 0xFF for byte in initial_vector_buffer)
×
278

279
            # Encrypt using AES-GCM
NEW
280
            encryptor = Cipher(algorithms.AES(aes_key_buffer), modes.GCM(flipped_iv)).encryptor()
×
NEW
281
            encrypted_bytes = encryptor.update(jsond.dumps(response_body).encode("utf-8")) + encryptor.finalize()
×
NEW
282
            encrypted_data_with_tag = encrypted_bytes + encryptor.tag
×
283

284
            # Encode result as base64
NEW
285
            encoded_data = base64.b64encode(encrypted_data_with_tag).decode("utf-8")
×
NEW
286
            return encoded_data
×
NEW
287
        except Exception as e:
×
NEW
288
            raise Exception(f"encryption failed-{str(e)}")
×
289

290

291
    @staticmethod
1✔
292
    def create_callback(callback_name: str, metadata: dict, bot: str, sender_id: str, channel: str,
1✔
293
                        name: str = 'callback_pyscript'):
294
        callback_url, identifier, standalone = CallbackData.create_entry(
1✔
295
            name=name,
296
            callback_config_name=callback_name,
297
            bot=bot,
298
            sender_id=sender_id,
299
            channel=channel,
300
            metadata=metadata,
301
        )
302
        if standalone:
1✔
303
            return identifier
1✔
304
        else:
305
            return callback_url
1✔
306

307
    @staticmethod
1✔
308
    def save_as_pdf(text: str, bot: str, sender_id:str):
1✔
NEW
309
        try:
×
NEW
310
            _, media_id = UserMedia.save_markdown_as_pdf(
×
311
                bot=bot,
312
                sender_id=sender_id,
313
                text=text,
314
                filepath="report.pdf"
315
            )
NEW
316
            return media_id
×
NEW
317
        except Exception as e:
×
NEW
318
            raise Exception(f"encryption failed-{str(e)}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc