• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 14257497228

04 Apr 2025 03:38AM UTC coverage: 90.377% (+0.02%) from 90.353%
14257497228

push

github

web-flow
create_callback function in callback pyscript (#1894)

Co-authored-by: spandan_mondal <spandan.mondal@nimblework.com>

10 of 10 new or added lines in 1 file covered. (100.0%)

45 existing lines in 6 files now uncovered.

25658 of 28390 relevant lines covered (90.38%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.45
/kairon/async_callback/processor.py
1
import asyncio
1✔
2
import functools
1✔
3
from concurrent.futures import ThreadPoolExecutor
1✔
4
from typing import Optional, Any, Text
1✔
5

6
from loguru import logger
1✔
7
from kairon import Utility
1✔
8
from kairon.async_callback.channel_message_dispacher import ChannelMessageDispatcher
1✔
9
from kairon.async_callback.utils import CallbackUtility
1✔
10
from kairon.exceptions import AppException
1✔
11
from kairon.shared.callback.data_objects import CallbackData, CallbackLog, CallbackExecutionMode
1✔
12
from kairon.shared.cloud.utils import CloudUtility
1✔
13
from kairon.shared.constants import EventClass
1✔
14
from kairon.shared.data.constant import TASK_TYPE
1✔
15

16
async_task_executor = ThreadPoolExecutor(max_workers=64)
1✔
17

18

19
class CallbackProcessor:
1✔
20
    @staticmethod
1✔
21
    def run_pyscript(script: str, predefined_objects: dict):
1✔
22
        """
23
        Run python script
24
        """
25
        trigger_task = Utility.environment['async_callback_action']['pyscript']['trigger_task']
1✔
26
        try:
1✔
27
            if trigger_task:
1✔
28
                logger.info("Triggering lambda for pyscript evaluation")
1✔
29
                lambda_response = CloudUtility.trigger_lambda(EventClass.pyscript_evaluator, {
1✔
30
                    'source_code': script,
31
                    'predefined_objects': predefined_objects
32
                }, task_type=TASK_TYPE.CALLBACK.value)
33
                if CloudUtility.lambda_execution_failed(lambda_response):
1✔
34
                    err = lambda_response['Payload'].get('body') or lambda_response
×
35
                    raise AppException(f"{err}")
×
36
                if err := lambda_response["Payload"].get('errorMessage'):
1✔
37
                    raise AppException(f"{err}")
1✔
38
                result = lambda_response["Payload"].get('body')
1✔
39
                return result
1✔
40
            else:
41
                logger.info("Triggering Callback Server for pyscript evaluation")
1✔
42
                response = CallbackUtility.pyscript_handler({
1✔
43
                    'source_code': script,
44
                    'predefined_objects': predefined_objects
45
                }, None)
46
                if response["statusCode"] != 200:
1✔
47
                    err = response.get('body') or response
1✔
48
                    raise AppException(f"{err}")
1✔
49
                result = response.get('body')
1✔
50
                return result
1✔
51
        except AppException as e:
1✔
52
            raise AppException(f"Error while executing pyscript: {str(e)}")
1✔
53

54
    @staticmethod
1✔
55
    def parse_pyscript_data(data: dict):
1✔
56
        bot_response = data.get('bot_response')
1✔
57
        state = data.get('state')
1✔
58
        invalidate = data.get('invalidate')
1✔
59
        return bot_response, state, invalidate
1✔
60

61
    @staticmethod
1✔
62
    def run_pyscript_async(script: str, predefined_objects: dict, callback: Any):
1✔
63
        """
64
        Run python script asynchronously
65
        """
66

67
        async def execute_script_task(cb: Any, src_code: Text, pre_objs: Optional[dict] = None):
1✔
68
            try:
1✔
69
                data = CallbackProcessor.run_pyscript(script=src_code, predefined_objects=pre_objs)
1✔
70
                await cb({'result': data})
1✔
71
            except AppException as ex:
1✔
72
                await cb({'error': str(ex)})
1✔
73

74
        def run_async_task():
1✔
75
            loop = asyncio.new_event_loop()
1✔
76
            asyncio.set_event_loop(loop)
1✔
77
            loop.run_until_complete(execute_script_task(callback, script, predefined_objects))
1✔
78

79
        try:
1✔
80
            async_task_executor.submit(run_async_task)
1✔
81
        except AppException as e:
1✔
82
            raise AppException(f"Error while executing pyscript: {str(e)}")
1✔
83

84
    @staticmethod
1✔
85
    async def async_callback(obj: dict, ent: dict, cb: dict, c_src: str, bot_id: str, sid: str, chnl: str, rd: dict):
1✔
86
        try:
1✔
87
            if not obj:
1✔
88
                raise AppException("No response received from callback script")
1✔
89
            elif res := obj.get('result'):
1✔
90
                bot_response, state, invalidate = CallbackProcessor.parse_pyscript_data(res)
1✔
91
                CallbackData.update_state(ent['bot'], ent['identifier'], state, invalidate)
1✔
92
                if bot_response:
1✔
93
                    await ChannelMessageDispatcher.dispatch_message(bot_id, sid, bot_response, chnl)
1✔
94
                CallbackLog.create_success_entry(name=ent.get("action_name"),
1✔
95
                                                 bot=bot_id,
96
                                                 channel=chnl,
97
                                                 identifier=ent.get("identifier"),
98
                                                 pyscript_code=cb.get("pyscript_code"),
99
                                                 sender_id=sid,
100
                                                 log=str(bot_response),
101
                                                 request_data=rd,
102
                                                 metadata=ent.get("metadata"),
103
                                                 callback_url=ent.get("callback_url"),
104
                                                 callback_source=c_src)
105
            elif error := obj.get('error'):
1✔
106
                raise AppException(f"Error while executing pyscript: {error}")
1✔
107
            else:
UNCOV
108
                raise AppException("No response received from callback script")
×
109
        except Exception as e:
1✔
110
            error_msg = str(e)
1✔
111
            logger.exception(error_msg)
1✔
112
            CallbackLog.create_failure_entry(name=ent.get("action_name"),
1✔
113
                                             bot=bot_id,
114
                                             channel=chnl,
115
                                             identifier=ent.get("identifier"),
116
                                             pyscript_code=cb.get("pyscript_code"),
117
                                             sender_id=sid,
118
                                             error_log=error_msg,
119
                                             request_data=rd,
120
                                             metadata=ent.get("metadata"),
121
                                             callback_url=ent.get("callback_url"),
122
                                             callback_source=c_src)
123

124
    @staticmethod
1✔
125
    async def process_async_callback_request(token: str,
1✔
126
                                             identifier: Optional[str] = None,
127
                                             request_data: Optional[dict] = None,
128
                                             callback_source: Optional[str] = None):
129
        """
130
        Process async callback request
131
        """
132
        predefined_objects = {
1✔
133
            "req": request_data,
134
            "req_host": callback_source,
135
        }
136
        error_code = 0
1✔
137
        message = "success"
1✔
138
        data = None
1✔
139
        entry, callback = CallbackData.validate_entry(token, identifier, request_data.get('body'))
1✔
140
        predefined_objects.update(entry)
1✔
141
        bot = entry.get("bot")
1✔
142
        execution_mode = callback.get("execution_mode")
1✔
143
        try:
1✔
144
            if execution_mode == CallbackExecutionMode.ASYNC.value:
1✔
145
                logger.info(f"Executing async callback. Identifier: {entry.get('identifier')}")
1✔
146

147
                async def callback_function(rsp: dict):
1✔
UNCOV
148
                    copied_func = functools.partial(CallbackProcessor.async_callback, rsp, entry, callback, callback_source, bot, entry.get("sender_id"), entry.get("channel"), request_data)
×
UNCOV
149
                    await copied_func()
×
150

151
                CallbackProcessor.run_pyscript_async(script=callback.get("pyscript_code"),
1✔
152
                                                     predefined_objects=predefined_objects,
153
                                                     callback=callback_function)
154
            elif execution_mode == CallbackExecutionMode.SYNC.value:
1✔
155
                logger.info(f"Executing sync callback. Identifier: {entry.get('identifier')}")
1✔
156
                result = CallbackProcessor.run_pyscript(script=callback.get("pyscript_code"),
1✔
157
                                                        predefined_objects=predefined_objects)
158
                bot_response, state, invalidate = CallbackProcessor.parse_pyscript_data(result)
1✔
159
                CallbackData.update_state(entry['bot'], entry['identifier'], state, invalidate)
1✔
160
                data = bot_response
1✔
161
                logger.info(f'Pyscript output: {bot_response, state, invalidate}')
1✔
162
                if data:
1✔
163
                    await ChannelMessageDispatcher.dispatch_message(bot, entry.get("sender_id"), data, entry.get("channel"))
1✔
164
                CallbackLog.create_success_entry(name=entry.get("action_name"),
1✔
165
                                                 bot=bot,
166
                                                 channel=entry.get("channel"),
167
                                                 identifier=entry.get("identifier"),
168
                                                 pyscript_code=callback.get("pyscript_code"),
169
                                                 sender_id=entry.get("sender_id"),
170
                                                 log=str(data),
171
                                                 request_data=request_data,
172
                                                 metadata=entry.get("metadata"),
173
                                                 callback_url=entry.get("callback_url"),
174
                                                 callback_source=callback_source)
175

176
        except AppException as e:
1✔
177
            error_code = 400
1✔
178
            message = str(e)
1✔
179
            CallbackLog.create_failure_entry(name=entry.get("action_name"),
1✔
180
                                             bot=bot,
181
                                             channel=entry.get("channel"),
182
                                             identifier=entry.get("identifier"),
183
                                             pyscript_code=callback.get("pyscript_code"),
184
                                             sender_id=entry.get("sender_id"),
185
                                             error_log=message,
186
                                             request_data=request_data,
187
                                             metadata=entry.get("metadata"),
188
                                             callback_url=entry.get("callback_url"),
189
                                             callback_source=callback_source)
190

191
        return data, message, error_code
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc