• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 13115292153

03 Feb 2025 02:07PM UTC coverage: 90.041% (-0.2%) from 90.222%
13115292153

Pull #1779

github

web-flow
Merge eabf68bed into 0e29863a1
Pull Request #1779: Callback Service Change

193 of 272 new or added lines in 6 files covered. (70.96%)

5 existing lines in 2 files now uncovered.

24656 of 27383 relevant lines covered (90.04%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.38
/kairon/async_callback/processor.py
1
import asyncio
1✔
2
import functools
1✔
3
from concurrent.futures import ThreadPoolExecutor
1✔
4
from typing import Optional, Any, Text
1✔
5

6
from loguru import logger
1✔
7
from kairon import Utility
1✔
8
from kairon.async_callback.channel_message_dispacher import ChannelMessageDispatcher
1✔
9
from kairon.async_callback.lambda_function import lambda_handler
1✔
10
from kairon.evaluator.processor import EvaluatorProcessor
1✔
11
from kairon.exceptions import AppException
1✔
12
from kairon.shared.callback.data_objects import CallbackData, CallbackConfig, CallbackLog, CallbackExecutionMode
1✔
13
from kairon.shared.cloud.utils import CloudUtility
1✔
14
from kairon.shared.constants import EventClass
1✔
15
from kairon.shared.data.constant import TASK_TYPE
1✔
16

17
async_task_executor = ThreadPoolExecutor(max_workers=64)
1✔
18

19

20
class CallbackProcessor:
1✔
21
    @staticmethod
1✔
22
    def run_pyscript(script: str, predefined_objects: dict):
1✔
23
        """
24
        Run python script
25
        """
26
        trigger_task = Utility.environment['async_callback_action']['pyscript']['trigger_task']
1✔
27
        try:
1✔
28
            if trigger_task:
1✔
29
                logger.info("Triggering lambda for pyscript evaluation")
1✔
30
                response = lambda_handler({
1✔
31
                    'source_code': script,
32
                    'predefined_objects': predefined_objects
33
                }, None)
34
                if response["statusCode"] != 200:
1✔
NEW
35
                    err = response.get('body') or response
×
UNCOV
36
                    raise AppException(f"{err}")
×
37
                result = response.get('body')
1✔
38
                return result
1✔
39
            else:
40
                logger.info("Triggering local_evaluator for pyscript evaluation")
1✔
41
                result = EvaluatorProcessor.evaluate_pyscript(source_code=script, predefined_objects=predefined_objects)
1✔
42
                return result
1✔
43
        except AppException as e:
×
44
            raise AppException(f"Error while executing pyscript: {str(e)}")
×
45

46
    @staticmethod
1✔
47
    def parse_pyscript_data(data: dict):
1✔
48
        bot_response = data.get('bot_response')
1✔
49
        state = data.get('state')
1✔
50
        invalidate = data.get('invalidate')
1✔
51
        return bot_response, state, invalidate
1✔
52

53
    @staticmethod
1✔
54
    def run_pyscript_async(script: str, predefined_objects: dict, callback: Any):
1✔
55
        """
56
        Run python script asynchronously
57
        """
58

59
        async def execute_script_task(cb: Any, src_code: Text, pre_objs: Optional[dict] = None):
1✔
60
            try:
1✔
61
                data = CallbackProcessor.run_pyscript(script=src_code, predefined_objects=pre_objs)
1✔
62
                await cb({'result': data})
1✔
63
            except AppException as ex:
1✔
64
                await cb({'error': str(ex)})
1✔
65

66
        def run_async_task():
1✔
67
            loop = asyncio.new_event_loop()
1✔
68
            asyncio.set_event_loop(loop)
1✔
69
            loop.run_until_complete(execute_script_task(callback, script, predefined_objects))
1✔
70

71
        try:
1✔
72
            async_task_executor.submit(run_async_task)
1✔
73
        except AppException as e:
1✔
74
            raise AppException(f"Error while executing pyscript: {str(e)}")
1✔
75

76
    @staticmethod
1✔
77
    async def async_callback(obj: dict, ent: dict, cb: dict, c_src: str, bot_id: str, sid: str, chnl: str, rd: dict):
1✔
78
        try:
1✔
79
            if not obj:
1✔
80
                raise AppException("No response received from callback script")
×
81
            elif res := obj.get('result'):
1✔
82
                bot_response, state, invalidate = CallbackProcessor.parse_pyscript_data(res)
1✔
83
                CallbackData.update_state(ent['bot'], ent['identifier'], state, invalidate)
1✔
84
                if bot_response:
1✔
85
                    await ChannelMessageDispatcher.dispatch_message(bot_id, sid, bot_response, chnl)
1✔
86
                CallbackLog.create_success_entry(name=ent.get("action_name"),
1✔
87
                                                 bot=bot_id,
88
                                                 channel=chnl,
89
                                                 identifier=ent.get("identifier"),
90
                                                 pyscript_code=cb.get("pyscript_code"),
91
                                                 sender_id=sid,
92
                                                 log=str(bot_response),
93
                                                 request_data=rd,
94
                                                 metadata=ent.get("metadata"),
95
                                                 callback_url=ent.get("callback_url"),
96
                                                 callback_source=c_src)
97
            elif error := obj.get('error'):
1✔
98
                raise AppException(f"Error while executing pyscript: {error}")
1✔
99
            else:
100
                raise AppException("No response received from callback script")
×
101
        except Exception as e:
1✔
102
            error_msg = str(e)
1✔
103
            logger.exception(error_msg)
1✔
104
            CallbackLog.create_failure_entry(name=ent.get("action_name"),
1✔
105
                                             bot=bot_id,
106
                                             channel=chnl,
107
                                             identifier=ent.get("identifier"),
108
                                             pyscript_code=cb.get("pyscript_code"),
109
                                             sender_id=sid,
110
                                             error_log=error_msg,
111
                                             request_data=rd,
112
                                             metadata=ent.get("metadata"),
113
                                             callback_url=ent.get("callback_url"),
114
                                             callback_source=c_src)
115

116
    @staticmethod
1✔
117
    async def process_async_callback_request(token: str,
1✔
118
                                             identifier: Optional[str] = None,
119
                                             request_data: Optional[dict] = None,
120
                                             callback_source: Optional[str] = None):
121
        """
122
        Process async callback request
123
        """
124
        predefined_objects = {
1✔
125
            "req": request_data,
126
            "req_host": callback_source,
127
        }
128
        error_code = 0
1✔
129
        message = "success"
1✔
130
        data = None
1✔
131
        entry, callback = CallbackData.validate_entry(token, identifier, request_data.get('body'))
1✔
132
        predefined_objects.update(entry)
1✔
133
        bot = entry.get("bot")
1✔
134
        execution_mode = callback.get("execution_mode")
1✔
135
        try:
1✔
136
            if execution_mode == CallbackExecutionMode.ASYNC.value:
1✔
137
                logger.info(f"Executing async callback. Identifier: {entry.get('identifier')}")
1✔
138

139
                async def callback_function(rsp: dict):
1✔
140
                    copied_func = functools.partial(CallbackProcessor.async_callback, rsp, entry, callback, callback_source, bot, entry.get("sender_id"), entry.get("channel"), request_data)
×
141
                    await copied_func()
×
142

143
                CallbackProcessor.run_pyscript_async(script=callback.get("pyscript_code"),
1✔
144
                                                     predefined_objects=predefined_objects,
145
                                                     callback=callback_function)
146
            elif execution_mode == CallbackExecutionMode.SYNC.value:
1✔
147
                logger.info(f"Executing sync callback. Identifier: {entry.get('identifier')}")
1✔
148
                result = CallbackProcessor.run_pyscript(script=callback.get("pyscript_code"),
1✔
149
                                                        predefined_objects=predefined_objects)
150
                bot_response, state, invalidate = CallbackProcessor.parse_pyscript_data(result)
1✔
151
                CallbackData.update_state(entry['bot'], entry['identifier'], state, invalidate)
1✔
152
                data = bot_response
1✔
153
                logger.info(f'Pyscript output: {bot_response, state, invalidate}')
1✔
154
                if data:
1✔
155
                    await ChannelMessageDispatcher.dispatch_message(bot, entry.get("sender_id"), data, entry.get("channel"))
1✔
156
                CallbackLog.create_success_entry(name=entry.get("action_name"),
1✔
157
                                                 bot=bot,
158
                                                 channel=entry.get("channel"),
159
                                                 identifier=entry.get("identifier"),
160
                                                 pyscript_code=callback.get("pyscript_code"),
161
                                                 sender_id=entry.get("sender_id"),
162
                                                 log=str(data),
163
                                                 request_data=request_data,
164
                                                 metadata=entry.get("metadata"),
165
                                                 callback_url=entry.get("callback_url"),
166
                                                 callback_source=callback_source)
167

168
        except AppException as e:
1✔
169
            error_code = 400
1✔
170
            message = str(e)
1✔
171
            CallbackLog.create_failure_entry(name=entry.get("action_name"),
1✔
172
                                             bot=bot,
173
                                             channel=entry.get("channel"),
174
                                             identifier=entry.get("identifier"),
175
                                             pyscript_code=callback.get("pyscript_code"),
176
                                             sender_id=entry.get("sender_id"),
177
                                             error_log=message,
178
                                             request_data=request_data,
179
                                             metadata=entry.get("metadata"),
180
                                             callback_url=entry.get("callback_url"),
181
                                             callback_source=callback_source)
182

183
        return data, message, error_code
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc