• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 17832844462

18 Sep 2025 03:00PM UTC coverage: 91.028% (+0.008%) from 91.02%
17832844462

push

github

web-flow
disptach resonse fix in callback pyscript test cases (#2165)

Co-authored-by: Aniket Kharkia <aniket.kharkia@nimblework.com>

28836 of 31678 relevant lines covered (91.03%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.35
/kairon/async_callback/processor.py
1
import asyncio
1✔
2
import functools
1✔
3
from concurrent.futures import ThreadPoolExecutor
1✔
4
from typing import Optional, Any, Text
1✔
5

6
from loguru import logger
1✔
7
from kairon import Utility
1✔
8
from kairon.async_callback.channel_message_dispacher import ChannelMessageDispatcher
1✔
9
from kairon.async_callback.utils import CallbackUtility
1✔
10
from kairon.exceptions import AppException
1✔
11
from kairon.shared.data.processor import MongoProcessor
1✔
12
from kairon.shared.callback.data_objects import CallbackData, CallbackLog, CallbackExecutionMode, CallbackResponseType
1✔
13
from kairon.shared.cloud.utils import CloudUtility
1✔
14
from kairon.shared.constants import EventClass
1✔
15
from kairon.shared.data.constant import TASK_TYPE
1✔
16

17
async_task_executor = ThreadPoolExecutor(max_workers=64)
1✔
18

19
class CallbackProcessor:
1✔
20
    @staticmethod
1✔
21
    def run_pyscript(script: str, predefined_objects: dict):
1✔
22
        """
23
        Run python script
24
        """
25
        trigger_task = Utility.environment['async_callback_action']['pyscript']['trigger_task']
1✔
26
        try:
1✔
27
            if trigger_task:
1✔
28
                logger.info("Triggering lambda for pyscript evaluation")
1✔
29
                lambda_response = CloudUtility.trigger_lambda(EventClass.pyscript_evaluator, {
1✔
30
                    'source_code': script,
31
                    'predefined_objects': predefined_objects
32
                }, task_type=TASK_TYPE.CALLBACK.value)
33
                if CloudUtility.lambda_execution_failed(lambda_response):
1✔
34
                    err = lambda_response['Payload'].get('body') or lambda_response
×
35
                    raise AppException(f"{err}")
×
36
                if err := lambda_response["Payload"].get('errorMessage'):
1✔
37
                    raise AppException(f"{err}")
1✔
38
                result = lambda_response["Payload"].get('body')
1✔
39
                return result
1✔
40
            else:
41
                logger.info("Triggering Callback Server for pyscript evaluation")
1✔
42
                response = CallbackUtility.pyscript_handler({
1✔
43
                    'source_code': script,
44
                    'predefined_objects': predefined_objects
45
                }, None)
46
                if response["statusCode"] != 200:
1✔
47
                    err = response.get('body') or response
1✔
48
                    raise AppException(f"{err}")
1✔
49
                result = response.get('body')
1✔
50
                return result
1✔
51
        except AppException as e:
1✔
52
            raise AppException(f"Error while executing pyscript: {str(e)}")
1✔
53

54
    @staticmethod
1✔
55
    def parse_pyscript_data(data: dict):
1✔
56
        bot_response = data.get('bot_response')
1✔
57
        state = data.get('state')
1✔
58
        invalidate = data.get('invalidate')
1✔
59
        dispatch_bot_response=data.get('dispatch_bot_response', False)
1✔
60
        return bot_response, state, invalidate, dispatch_bot_response
1✔
61

62
    @staticmethod
1✔
63
    def run_pyscript_async(script: str, predefined_objects: dict, callback: Any):
1✔
64
        """
65
        Run python script asynchronously
66
        """
67

68
        async def execute_script_task(cb: Any, src_code: Text, pre_objs: Optional[dict] = None):
1✔
69
            try:
1✔
70
                data = CallbackProcessor.run_pyscript(script=src_code, predefined_objects=pre_objs)
1✔
71
                await cb({'result': data})
1✔
72
            except AppException as ex:
1✔
73
                await cb({'error': str(ex)})
1✔
74

75
        def run_async_task():
1✔
76
            loop = asyncio.new_event_loop()
1✔
77
            asyncio.set_event_loop(loop)
1✔
78
            loop.run_until_complete(execute_script_task(callback, script, predefined_objects))
1✔
79

80
        try:
1✔
81
            async_task_executor.submit(run_async_task)
1✔
82
        except AppException as e:
1✔
83
            raise AppException(f"Error while executing pyscript: {str(e)}")
1✔
84

85
    @staticmethod
1✔
86
    async def async_callback(obj: dict, ent: dict, cb: dict, c_src: str, bot_id: str, sid: str, chnl: str, rd: dict):
1✔
87
        try:
1✔
88
            if not obj:
1✔
89
                raise AppException("No response received from callback script")
1✔
90
            elif res := obj.get('result'):
1✔
91
                bot_response, state, invalidate, dispatch_bot_response = CallbackProcessor.parse_pyscript_data(res)
1✔
92
                CallbackData.update_state(ent['bot'], ent['identifier'], state, invalidate)
1✔
93
                if dispatch_bot_response and bot_response:
1✔
94
                    await ChannelMessageDispatcher.dispatch_message(bot_id, sid, bot_response, chnl)
1✔
95
                CallbackLog.create_success_entry(name=ent.get("action_name"),
1✔
96
                                                 bot=bot_id,
97
                                                 channel=chnl,
98
                                                 identifier=ent.get("identifier"),
99
                                                 pyscript_code=cb.get("pyscript_code"),
100
                                                 sender_id=sid,
101
                                                 log=str(bot_response),
102
                                                 request_data=rd,
103
                                                 metadata=ent.get("metadata"),
104
                                                 callback_url=ent.get("callback_url"),
105
                                                 callback_source=c_src)
106
            elif error := obj.get('error'):
1✔
107
                raise AppException(f"Error while executing pyscript: {error}")
1✔
108
            else:
109
                raise AppException("No response received from callback script")
×
110
        except Exception as e:
1✔
111
            error_msg = str(e)
1✔
112
            logger.exception(error_msg)
1✔
113
            CallbackLog.create_failure_entry(name=ent.get("action_name"),
1✔
114
                                             bot=bot_id,
115
                                             channel=chnl,
116
                                             identifier=ent.get("identifier"),
117
                                             pyscript_code=cb.get("pyscript_code"),
118
                                             sender_id=sid,
119
                                             error_log=error_msg,
120
                                             request_data=rd,
121
                                             metadata=ent.get("metadata"),
122
                                             callback_url=ent.get("callback_url"),
123
                                             callback_source=c_src)
124

125
    @staticmethod
1✔
126
    async def process_async_callback_request(token: str,
1✔
127
                                             identifier: Optional[str] = None,
128
                                             request_data: Optional[dict] = None,
129
                                             callback_source: Optional[str] = None):
130
        """
131
        Process async callback request
132
        """
133
        predefined_objects = {
1✔
134
            "req": request_data,
135
            "req_host": callback_source,
136
        }
137
        error_code = 0
1✔
138
        message = "success"
1✔
139
        data = None
1✔
140
        entry, callback = CallbackData.validate_entry(token, identifier, request_data.get('body'))
1✔
141
        predefined_objects.update(entry)
1✔
142
        bot = entry.get("bot")
1✔
143
        execution_mode = callback.get("execution_mode")
1✔
144
        response_type = callback.get("response_type", CallbackResponseType.KAIRON_JSON.value)
1✔
145
        try:
1✔
146
            if execution_mode == CallbackExecutionMode.ASYNC.value:
1✔
147
                logger.info(f"Executing async callback. Identifier: {entry.get('identifier')}")
1✔
148

149
                async def callback_function(rsp: dict):
1✔
150
                    copied_func = functools.partial(CallbackProcessor.async_callback, rsp, entry, callback, callback_source, bot, entry.get("sender_id"), entry.get("channel"), request_data)
1✔
151
                    await copied_func()
1✔
152

153
                CallbackProcessor.run_pyscript_async(script=callback.get("pyscript_code"),
1✔
154
                                                     predefined_objects=predefined_objects,
155
                                                     callback=callback_function)
156
            elif execution_mode == CallbackExecutionMode.SYNC.value:
1✔
157
                logger.info(f"Executing sync callback. Identifier: {entry.get('identifier')}")
1✔
158
                result = CallbackProcessor.run_pyscript(script=callback.get("pyscript_code"),
1✔
159
                                                        predefined_objects=predefined_objects)
160
                bot_response, state, invalidate, dispatch_bot_response = CallbackProcessor.parse_pyscript_data(result)
1✔
161
                CallbackData.update_state(entry['bot'], entry['identifier'], state, invalidate)
1✔
162
                data = bot_response
1✔
163
                logger.info(f'Pyscript output: {bot_response, state, invalidate}')
1✔
164
                if dispatch_bot_response and bot_response:
1✔
165
                    await ChannelMessageDispatcher.dispatch_message(bot, entry.get("sender_id"), data, entry.get("channel"))
1✔
166
                CallbackLog.create_success_entry(name=entry.get("action_name"),
1✔
167
                                                 bot=bot,
168
                                                 channel=entry.get("channel"),
169
                                                 identifier=entry.get("identifier"),
170
                                                 pyscript_code=callback.get("pyscript_code"),
171
                                                 sender_id=entry.get("sender_id"),
172
                                                 log=str(data),
173
                                                 request_data=request_data,
174
                                                 metadata=entry.get("metadata"),
175
                                                 callback_url=entry.get("callback_url"),
176
                                                 callback_source=callback_source)
177

178
        except AppException as e:
1✔
179
            error_code = 400
1✔
180
            message = str(e)
1✔
181
            CallbackLog.create_failure_entry(name=entry.get("action_name"),
1✔
182
                                             bot=bot,
183
                                             channel=entry.get("channel"),
184
                                             identifier=entry.get("identifier"),
185
                                             pyscript_code=callback.get("pyscript_code"),
186
                                             sender_id=entry.get("sender_id"),
187
                                             error_log=message,
188
                                             request_data=request_data,
189
                                             metadata=entry.get("metadata"),
190
                                             callback_url=entry.get("callback_url"),
191
                                             callback_source=callback_source)
192

193
        return data, message, error_code, response_type
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc