• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 21936128812

12 Feb 2026 06:30AM UTC coverage: 91.139% (+0.001%) from 91.138%
21936128812

Pull #2338

github

web-flow
Merge a76836439 into 2946d3506
Pull Request #2338: Static callback fix 2

6 of 6 new or added lines in 3 files covered. (100.0%)

22 existing lines in 3 files now uncovered.

30208 of 33145 relevant lines covered (91.14%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.79
/kairon/shared/callback/data_objects.py
1
import base64
1✔
2
import time
1✔
3
from datetime import datetime
1✔
4
from enum import Enum
1✔
5
from typing import Any, Optional, Dict
1✔
6
import json
1✔
7

8
from pydantic import BaseModel
1✔
9
from uuid6 import uuid7
1✔
10

11
from mongoengine import StringField, DictField, DateTimeField, Document, DynamicField, IntField, BooleanField, \
1✔
12
    FloatField
13

14
from kairon import Utility
1✔
15
from kairon.exceptions import AppException
1✔
16
from kairon.shared.actions.data_objects import CallbackActionConfig
1✔
17
from kairon.shared.constants import EventClass
1✔
18
from kairon.shared.data.audit.data_objects import Auditlog
1✔
19
from kairon.shared.data.signals import push_notification
1✔
20
from cryptography.fernet import Fernet
1✔
21
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
1✔
22
from cryptography.hazmat.backends import default_backend
1✔
23

24

25
def check_nonempty_string(value, msg="Value must be a non-empty string"):
1✔
26
    if not isinstance(value, str) or not value:
1✔
27
        raise AppException(msg)
1✔
28

29

30
def encrypt_secret(secret: str) -> str:
1✔
31
    secret = secret.encode("utf-8")
1✔
32
    fernet = Fernet(Utility.environment['security']['fernet_key'].encode("utf-8"))
1✔
33
    return fernet.encrypt(secret).decode("utf-8")
1✔
34

35

36
def decrypt_secret(encrypted_secret: str) -> str:
1✔
37
    fernet = Fernet(Utility.environment['security']['fernet_key'].encode("utf-8"))
1✔
38
    return fernet.decrypt(encrypted_secret.encode("utf-8")).decode("utf-8")
1✔
39

40

41
def xor_encrypt_secret(secret: str) -> str:
1✔
42
    """
43
    AES small length text encryption
44
    TODO: change function name
45
    """
46
    key = Utility.environment['async_callback_action']['short_secret']['aes_key']
1✔
47
    iv = Utility.environment['async_callback_action']['short_secret']['aes_iv']
1✔
48
    key = bytes.fromhex(key)
1✔
49
    iv = bytes.fromhex(iv)
1✔
50
    secret_bytes = secret.encode()
1✔
51
    cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend())
1✔
52
    encryptor = cipher.encryptor()
1✔
53
    ciphertext = encryptor.update(secret_bytes) + encryptor.finalize()
1✔
54
    encoded_result = base64.urlsafe_b64encode(ciphertext).decode().rstrip("=")
1✔
55
    return encoded_result
1✔
56

57

58
def xor_decrypt_secret(encoded_secret: str) -> str:
1✔
59
    """
60
    AES encripted text decription function
61
    TODO: change function name
62
    """
63
    key = Utility.environment['async_callback_action']['short_secret']['aes_key']
1✔
64
    iv = Utility.environment['async_callback_action']['short_secret']['aes_iv']
1✔
65
    key = bytes.fromhex(key)
1✔
66
    iv = bytes.fromhex(iv)
1✔
67
    secret = None
1✔
68
    try:
1✔
69
        decoded_secret = base64.urlsafe_b64decode(encoded_secret + "=" * (4 - len(encoded_secret) % 4))
1✔
70
        cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend())
1✔
71
        decryptor = cipher.decryptor()
1✔
72
        secret = decryptor.update(decoded_secret) + decryptor.finalize()
1✔
73
        return secret.decode()
1✔
74
    except Exception:
1✔
75
        raise AppException("Invalid token!")
1✔
76

77

78
class CallbackExecutionMode(str, Enum):
1✔
79
    ASYNC = "async"
1✔
80
    SYNC = "sync"
1✔
81

82
class CallbackResponseType(str, Enum):
1✔
83
    KAIRON_JSON = "kairon_json"
1✔
84
    JSON = "json"
1✔
85
    TEXT = "text"
1✔
86

87

88
@push_notification.apply
1✔
89
class CallbackConfig(Auditlog):
1✔
90
    name = StringField(required=True)
1✔
91
    pyscript_code = StringField(required=True)
1✔
92
    validation_secret = StringField(required=True)
1✔
93
    execution_mode = StringField(default=CallbackExecutionMode.ASYNC.value,
1✔
94
                                 choices=[v.value for v in CallbackExecutionMode])
95
    expire_in = IntField(default=0)
1✔
96
    shorten_token = BooleanField(default=False)
1✔
97
    token_hash = StringField()
1✔
98
    token_value = StringField()
1✔
99
    standalone = BooleanField(default=False)
1✔
100
    standalone_id_path = StringField(default='')
1✔
101
    response_type = StringField(default=CallbackResponseType.KAIRON_JSON.value,
1✔
102
                                choices=[v.value for v in CallbackResponseType])
103
    bot = StringField(required=True)
1✔
104
    meta = {"indexes": [{"fields": ["bot", "name"]}]}
1✔
105

106
    @staticmethod
1✔
107
    def get_all_names(bot) -> list[str]:
1✔
108
        names = CallbackConfig.objects(bot=bot).distinct(field="name")
1✔
109
        return list(names)
1✔
110

111
    @staticmethod
1✔
112
    def get_entry(bot :str, name :str) -> dict:
1✔
113
        entry = CallbackConfig.objects(bot=bot, name__iexact=name).first()
1✔
114
        if not entry:
1✔
115
            raise AppException(f"Callback Configuration with name '{name}' does not exist!")
1✔
116
        dict_form = entry.to_mongo().to_dict()
1✔
117
        dict_form.pop("_id")
1✔
118
        return dict_form
1✔
119

120
    @staticmethod
1✔
121
    def create_entry(bot: str,
1✔
122
                     name: str,
123
                     pyscript_code: str,
124
                     execution_mode: str = CallbackExecutionMode.ASYNC.value,
125
                     expire_in: int = 30,
126
                     shorten_token: bool = False,
127
                     standalone: bool = False,
128
                     standalone_id_path: str = '',
129
                     response_type: str = CallbackResponseType.KAIRON_JSON.value,
130
                     **kwargs):
131
        check_nonempty_string(name)
1✔
132
        if standalone and not standalone_id_path:
1✔
UNCOV
133
            raise AppException("Standalone ID path is required for standalone callbacks!")
×
134
        Utility.is_exist(
1✔
135
            CallbackConfig,
136
            exp_message=f"Callback Configuration with name '{name}' exists!",
137
            name__iexact=name,
138
            bot=bot,
139
            raise_error=True
140
        )
141
        check_nonempty_string(pyscript_code)
1✔
142
        validation_secret = encrypt_secret(uuid7().hex)
1✔
143
        token_hash = None
1✔
144
        if shorten_token:
1✔
UNCOV
145
            token_hash = uuid7().hex
×
146
        config = CallbackConfig(name=name,
1✔
147
                                bot=bot,
148
                                pyscript_code=pyscript_code,
149
                                validation_secret=validation_secret,
150
                                execution_mode=execution_mode,
151
                                expire_in=expire_in,
152
                                shorten_token=shorten_token,
153
                                token_hash=token_hash,
154
                                standalone=standalone,
155
                                standalone_id_path=standalone_id_path,
156
                                response_type=response_type,
157
                                **kwargs)
158
        config.save()
1✔
159
        return config.to_mongo().to_dict()
1✔
160

161
    @staticmethod
1✔
162
    def get_auth_token(bot: str, name: str) -> tuple[str, bool]:
1✔
163
        entry = CallbackConfig.objects(bot=bot, name__iexact=name).first()
1✔
164
        if not entry:
1✔
165
            raise AppException(f"Callback Configuration with name '{name}' does not exist!")
1✔
166

167
        info = {
1✔
168
            "bot": entry.bot,
169
            "callback_name": entry.name,
170
            "validation_secret": decrypt_secret(entry.validation_secret),
171
            "expire_in": entry.expire_in,
172
        }
173

174
        token = encrypt_secret(json.dumps(info))
1✔
175

176
        if entry.shorten_token:
1✔
UNCOV
177
            entry.token_value = token
×
178
            entry.save()
×
179
            return xor_encrypt_secret(entry.token_hash), entry.standalone
×
180
        else:
181
            return token, entry.standalone
1✔
182

183
    @staticmethod
1✔
184
    def verify_auth_token(token: str):
1✔
185
        info = None
1✔
186
        if len(token) < 64:
1✔
187
            search_key = xor_decrypt_secret(token)
1✔
188
            config = CallbackConfig.objects(token_hash=search_key, shorten_token=True).first()
1✔
189
            if config:
1✔
190
                info = json.loads(decrypt_secret(config.token_value))
1✔
191
        else:
192
            info = json.loads(decrypt_secret(token))
1✔
193
        if not info:
1✔
UNCOV
194
            raise AppException("Invalid token!")
×
195

196
        config = CallbackConfig.objects(bot=info['bot'],
1✔
197
                                        name__iexact=info['callback_name'],
198
                                        ).first()
199
        if not config:
1✔
UNCOV
200
            raise AppException("Invalid token!")
×
201
        if decrypt_secret(config.validation_secret) != info['validation_secret']:
1✔
UNCOV
202
            raise AppException("Invalid token!")
×
203
        return config
1✔
204

205
    @staticmethod
1✔
206
    def edit(bot: str, name: str, **kwargs):
1✔
207
        check_nonempty_string(name)
1✔
208
        config = CallbackConfig.objects(bot=bot, name__iexact=name).first()
1✔
209
        if not config:
1✔
UNCOV
210
            raise AppException(f"Callback Configuration with name '{name}' does not exist!")
×
211
        for key, value in kwargs.items():
1✔
212
            setattr(config, key, value)
1✔
213
        if config.shorten_token and not config.token_hash:
1✔
UNCOV
214
            config.token_hash = uuid7().hex
×
215
        config.save()
1✔
216
        return config.to_mongo().to_dict()
1✔
217

218
    @staticmethod
1✔
219
    def delete_entry(bot: str, name: str):
1✔
220
        check_nonempty_string(name)
1✔
221
        callback_action = CallbackActionConfig.objects(bot=bot, callback_name=name).first()
1✔
222
        if callback_action:
1✔
UNCOV
223
            raise AppException(f"Cannot delete Callback Configuration '{name}' as it is attached to {callback_action.name} callback action!")
×
224
        config = CallbackConfig.objects(bot=bot, name__iexact=name).first()
1✔
225
        if not config:
1✔
UNCOV
226
            raise AppException(f"Callback Configuration with name '{name}' does not exist!")
×
227
        config.delete()
1✔
228
        return config.name
1✔
229

230
    @staticmethod
1✔
231
    def get_callback_url(bot: str, name: str):
1✔
232
        base_url = Utility.environment['async_callback_action']['url']
1✔
233
        auth_token, is_standalone = CallbackConfig.get_auth_token(bot, name)
1✔
234
        if not is_standalone:
1✔
UNCOV
235
            raise AppException(f"Callback Configuration with name '{name}' is not standalone!")
×
236
        callback_url = f"{base_url}/s/{auth_token}"
1✔
237
        return callback_url
1✔
238

239

240
class CallbackRecordStatusType(Enum):
1✔
241
    SUCCESS = "Success"
1✔
242
    FAILED = "Failed"
1✔
243

244

245
@push_notification.apply
1✔
246
class CallbackData(Document):
1✔
247
    """
248
    this represents a record of every callback execution generated by action trigger
249
    """
250
    action_name = StringField()
1✔
251
    callback_name = StringField(required=True)
1✔
252
    bot = StringField(required=True)
1✔
253
    sender_id = StringField(required=True)
1✔
254
    channel = StringField(required=True)
1✔
255
    metadata = DictField()
1✔
256
    identifier = StringField(required=True)
1✔
257
    timestamp = FloatField(default=time.time)
1✔
258
    callback_url = StringField()
1✔
259
    execution_mode = StringField(default=CallbackExecutionMode.ASYNC.value,
1✔
260
                                 choices=[v.value for v in CallbackExecutionMode.__members__.values()])
261
    state = DynamicField(default=0)
1✔
262
    is_valid = BooleanField(default=True)
1✔
263
    meta = {"indexes": [{"fields": ["bot", "identifier"]}]}
1✔
264

265
    @staticmethod
1✔
266
    def create_entry(name: str, callback_config_name: str, bot: str, sender_id: str, channel: str, metadata: dict, **kwargs):
1✔
267
        check_nonempty_string(name)
1✔
268
        check_nonempty_string(callback_config_name)
1✔
269
        check_nonempty_string(bot)
1✔
270
        check_nonempty_string(sender_id)
1✔
271
        check_nonempty_string(channel)
1✔
272
        identifier = f"{uuid7().hex}"
1✔
273
        base_url = Utility.environment['async_callback_action']['url']
1✔
274
        auth_token, is_standalone = CallbackConfig.get_auth_token(bot, callback_config_name)
1✔
275
        callback_url = f"{base_url}/"
1✔
276
        if is_standalone:
1✔
277
            callback_url += f"s/{auth_token}"
1✔
278
        else:
279
            callback_url += f"d/{identifier}/{auth_token}"
1✔
280

281
        record = CallbackData(action_name=name,
1✔
282
                              callback_name=callback_config_name,
283
                              bot=bot,
284
                              sender_id=sender_id,
285
                              channel=channel,
286
                              metadata=metadata,
287
                              identifier=identifier,
288
                              callback_url=callback_url,
289
                              timestamp=time.time(),
290
                              is_valid=True,
291
                              **kwargs)
292
        record.save()
1✔
293
        return callback_url, identifier, is_standalone
1✔
294

295
    @staticmethod
1✔
296
    def get_value_from_json(json_obj: Any, path: str):
1✔
297
        keys = path.split('.')
1✔
298
        value = json_obj
1✔
299
        try:
1✔
300
            for key in keys:
1✔
301
                if isinstance(value, list):
1✔
302
                    key = int(key)
1✔
303
                value = value[key]
1✔
304
        except (KeyError, IndexError, ValueError, TypeError):
1✔
305
            raise AppException(f"Cannot find identifier at path '{path}' in request data!", 422)
1✔
306

307
        return value
1✔
308

309
    @staticmethod
1✔
310
    def validate_entry(token: str, identifier: Optional[str] = None, request_body: Any = None) -> tuple[dict, dict]:
1✔
311
        check_nonempty_string(token)
1✔
312
        config_entry = CallbackConfig.verify_auth_token(token)
1✔
313

314
        if config_entry.standalone:
1✔
315
            if not request_body:
1✔
UNCOV
316
                raise AppException("Request data is required for standalone callbacks!")
×
317
            identifier = CallbackData.get_value_from_json(request_body, config_entry.standalone_id_path)
1✔
318

319
        record = CallbackData.objects(bot=config_entry.bot, identifier=identifier).first()
1✔
320
        if not record:
1✔
321
            raise AppException("Callback Record does not exist, invalid identifier!")
1✔
322
        if not record.is_valid:
1✔
UNCOV
323
            raise AppException("Callback has been invalidated!")
×
324
        if config_entry.expire_in > 0:
1✔
UNCOV
325
            exp_time = record.timestamp + config_entry.expire_in
×
326
            if exp_time < time.time():
×
327
                raise AppException("Callback time-limit expired")
×
328
        entry_dict = record.to_mongo().to_dict()
1✔
329
        entry_dict.pop('_id')
1✔
330
        entry_dict.pop('timestamp')
1✔
331
        callback_dict = config_entry.to_mongo().to_dict()
1✔
332
        return entry_dict, callback_dict
1✔
333

334
    @staticmethod
1✔
335
    def update_state(bot: str, identifier: str, state: dict, invalidate: bool):
1✔
336
        record = CallbackData.objects(bot=bot, identifier=identifier).first()
1✔
337
        if not record:
1✔
338
            raise AppException("Callback Record does not exist, invalid identifier!")
1✔
339
        record.state = state
1✔
340
        record.is_valid = not invalidate
1✔
341
        record.save()
1✔
342
        return record.to_mongo().to_dict()
1✔
343

344

345
@push_notification.apply
1✔
346
class CallbackLog(Document):
1✔
347
    """
348
        this represents the record of actual execution record of  callback after the callback url is triggered
349
    """
350
    callback_name = StringField(required=True)
1✔
351
    bot = StringField(required=True)
1✔
352
    channel = StringField(default='unsupported')
1✔
353
    identifier = StringField(required=True)
1✔
354
    pyscript_code = StringField(required=True)
1✔
355
    sender_id = StringField()
1✔
356
    log = StringField()
1✔
357
    timestamp = DateTimeField(default=datetime.utcnow)
1✔
358
    status = StringField(default=CallbackRecordStatusType.SUCCESS.value,
1✔
359
                         choices=[v.value for v in CallbackRecordStatusType.__members__.values()])
360
    request_data = DynamicField()
1✔
361
    metadata = DynamicField()
1✔
362
    callback_url = StringField(required=True)
1✔
363
    callback_source = StringField()
1✔
364

365
    meta = {"indexes": [{"fields": ["bot", "identifier"]}]}
1✔
366

367
    @staticmethod
1✔
368
    def create_success_entry(name: str,
1✔
369
                             bot: str,
370
                             channel: str,
371
                             identifier: str,
372
                             pyscript_code: str,
373
                             sender_id: str,
374
                             log: str,
375
                             request_data: Any,
376
                             metadata: dict,
377
                             callback_url: str,
378
                             callback_source: str) -> dict:
379
        check_nonempty_string(name)
1✔
380
        check_nonempty_string(bot)
1✔
381
        check_nonempty_string(identifier)
1✔
382
        check_nonempty_string(pyscript_code)
1✔
383
        check_nonempty_string(callback_url)
1✔
384
        record = CallbackLog(callback_name=name,
1✔
385
                             bot=bot,
386
                             channel=channel,
387
                             identifier=identifier,
388
                             pyscript_code=pyscript_code,
389
                             sender_id=sender_id,
390
                             log=log,
391
                             request_data=request_data,
392
                             metadata=metadata,
393
                             callback_url=callback_url,
394
                             callback_source=callback_source,
395
                             status=CallbackRecordStatusType.SUCCESS.value,
396
                             timestamp=datetime.utcnow())
397
        record.save()
1✔
398
        return record.to_mongo().to_dict()
1✔
399

400
    @staticmethod
1✔
401
    def create_failure_entry(name: str,
1✔
402
                             bot: str,
403
                             channel: str,
404
                             identifier: str,
405
                             pyscript_code: str,
406
                             sender_id: str,
407
                             error_log: str,
408
                             request_data: Any,
409
                             metadata: dict,
410
                             callback_url: str,
411
                             callback_source: str) -> dict:
412
        check_nonempty_string(name)
1✔
413
        check_nonempty_string(bot)
1✔
414
        check_nonempty_string(identifier)
1✔
415
        check_nonempty_string(pyscript_code)
1✔
416
        check_nonempty_string(callback_url)
1✔
417
        record = CallbackLog(callback_name=name,
1✔
418
                             bot=bot,
419
                             channel=channel,
420
                             identifier=identifier,
421
                             pyscript_code=pyscript_code,
422
                             sender_id=sender_id,
423
                             log=error_log,
424
                             request_data=request_data,
425
                             metadata=metadata,
426
                             callback_url=callback_url,
427
                             callback_source=callback_source,
428
                             status=CallbackRecordStatusType.FAILED.value,
429
                             timestamp=datetime.utcnow())
430
        record.save()
1✔
431
        return record.to_mongo().to_dict()
1✔
432

433
    @staticmethod
1✔
434
    def get_logs(query: dict, offset: int, limit: int) -> tuple[list[dict], int]:
1✔
435
        logs = CallbackLog.objects(**query).skip(offset).limit(limit).exclude('id').order_by('-timestamp').to_json()
1✔
436
        logs_dict_list = json.loads(logs)
1✔
437
        for log in logs_dict_list:
1✔
438
            log['timestamp'] = log['timestamp']['$date']
1✔
439
        total = CallbackLog.objects(**query).count()
1✔
440
        return logs_dict_list, total
1✔
441

442
class PyscriptPayload(BaseModel):
1✔
443
    """
444
    Incoming JSON payload for restricted-Python execution.
445
    """
446
    source_code: str
1✔
447
    predefined_objects: Optional[Dict[str, Any]] = None
1✔
448

449

450
class CallbackRequest(BaseModel):
1✔
451
    event_class: EventClass
1✔
452
    data: dict
1✔
453
    task_type: str
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc