• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 18519632840

15 Oct 2025 06:15AM UTC coverage: 91.093% (-0.01%) from 91.103%
18519632840

Pull #2212

github

web-flow
Merge 5d24aab02 into a993f0d72
Pull Request #2212: Update for environment variables

29066 of 31908 relevant lines covered (91.09%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.14
/kairon/chat/handlers/channels/whatsapp.py
1
import json
1✔
2
from typing import Optional, Dict, Text, Any, List, Union
1✔
3

4
from rasa.core.channels import OutputChannel, UserMessage
1✔
5
from starlette.requests import Request
1✔
6

7
from kairon.chat.agent_processor import AgentProcessor
1✔
8
from kairon.chat.handlers.channels.clients.whatsapp.factory import WhatsappFactory
1✔
9
from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud
1✔
10
from kairon.chat.handlers.channels.messenger import MessengerHandler
1✔
11
import logging
1✔
12

13
from kairon.shared.chat.processor import ChatDataProcessor
1✔
14
from kairon import Utility
1✔
15
from kairon.shared.chat.user_media import UserMedia
1✔
16
from kairon.shared.concurrency.actors.factory import ActorFactory
1✔
17
from kairon.shared.constants import ChannelTypes, ActorType
1✔
18
from kairon.shared.models import User
1✔
19

20
logger = logging.getLogger(__name__)
1✔
21

22

23
class Whatsapp:
1✔
24
    """Whatsapp input channel to parse incoming webhooks and send msgs."""
25

26
    def __init__(self, config: dict) -> None:
1✔
27
        """Init whatsapp input channel."""
28
        self.config = config
1✔
29
        self.last_message: Dict[Text, Any] = {}
1✔
30

31
    @classmethod
1✔
32
    def name(cls) -> Text:
1✔
33
        return ChannelTypes.WHATSAPP.value
1✔
34

35
    async def message(
1✔
36
            self, message: Dict[Text, Any], metadata: Optional[Dict[Text, Any]], bot: str
37
    ) -> None:
38
        """Handle an incoming event from the whatsapp webhook."""
39

40
        # quick reply and user message both share 'text' attribute
41
        # so quick reply should be checked first
42
        media_ids = None
1✔
43
        if message.get("type") == "interactive":
1✔
44
            interactive_type = message.get("interactive").get("type")
1✔
45
            if interactive_type == "nfm_reply":
1✔
46
                logger.debug(message["interactive"][interactive_type])
1✔
47
                response_json = json.loads(message["interactive"][interactive_type]['response_json'])
1✔
48
                response_json.update({"type": interactive_type})
1✔
49
                entity = json.dumps({"flow_reply": response_json})
1✔
50
                docs = response_json.get("documents", [])
1✔
51
                if docs:
1✔
52
                    temp_media_ids=[]
1✔
53
                    media_id_list=[]
1✔
54
                    for doc in docs:
1✔
55
                        media_id = doc["id"]
1✔
56
                        ids = UserMedia.save_whatsapp_media_content(
1✔
57
                            bot=bot,
58
                            sender_id=message["from"],
59
                            whatsapp_media_id=media_id,
60
                            config=self.config
61
                        )
62
                        media_id_list.append(media_id)
1✔
63
                        temp_media_ids.extend(ids)
1✔
64
                    media_ids=temp_media_ids
1✔
65
                    text = f"/k_multimedia_msg{{\"flow_docs\": \"{media_id_list}\"}}"
1✔
66
                else:
67
                    text = f"/k_interactive_msg{entity}"
1✔
68
            else:
69
                text = message["interactive"][interactive_type]["id"]
×
70
        elif message.get("type") == "text":
1✔
71
            text = message["text"]['body']
1✔
72
        elif message.get("type") == "button":
1✔
73
            if message["button"].get("payload") == message["button"].get("text"):
1✔
74
                text = message["button"]["text"]
1✔
75
            else:
76
                text = f"/k_quick_reply_msg{{\"{'quick_reply'}\": \"{message['button']['payload']}\"}}"
1✔
77
        elif message.get("type") in {"image", "audio", "document", "video", "voice"}:
1✔
78
            if message['type'] == "voice":
1✔
79
                message['type'] = "audio"
×
80
            text = f"/k_multimedia_msg{{\"{message['type']}\": \"{message[message['type']]['id']}\"}}"
1✔
81
            media_ids = UserMedia.save_whatsapp_media_content(
1✔
82
                bot=bot,
83
                sender_id=message["from"],
84
                whatsapp_media_id=message[message['type']]['id'],
85
                config=self.config
86
            )
87
        elif message.get("type") == "location":
1✔
88
            logger.debug(message['location'])
1✔
89
            text = f"/k_multimedia_msg{{\"latitude\": \"{message['location']['latitude']}\", \"longitude\": \"{message['location']['longitude']}\"}}"
1✔
90
        elif message.get("type") == "order":
1✔
91
            logger.debug(message['order'])
1✔
92
            entity = json.dumps({message["type"]: message['order']})
1✔
93
            text = f"/k_order_msg{entity}"
1✔
94
        elif message.get("type") == "payment":
1✔
95
            logger.debug(message['payment'])
1✔
96
            entity = json.dumps({message["type"]: message['payment']})
1✔
97
            text = f"/k_payment_msg{entity}"
1✔
98
        else:
99
            logger.warning(f"Received a message from whatsapp that we can not handle. Message: {message}")
1✔
100
            return
1✔
101
        message.update(metadata)
1✔
102
        await self._handle_user_message(text, message["from"], message, bot, media_ids)
1✔
103

104
    async def handle_meta_payload(self, payload: Dict, metadata: Optional[Dict[Text, Any]], bot: str) -> None:
1✔
105
        provider = self.config.get("bsp_type", "meta")
1✔
106
        access_token = self.__get_access_token()
1✔
107
        for entry in payload["entry"]:
1✔
108
            for changes in entry["changes"]:
1✔
109
                self.last_message = changes
1✔
110
                client = WhatsappFactory.get_client(provider)
1✔
111
                self.client = client(access_token, from_phone_number_id=self.get_business_phone_number_id())
1✔
112
                msg_metadata = changes.get("value", {}).get("metadata", {})
1✔
113
                metadata.update(msg_metadata)
1✔
114
                messages = changes.get("value", {}).get("messages")
1✔
115
                if not messages:
1✔
116
                    statuses = changes.get("value", {}).get("statuses")
1✔
117
                    user = metadata.get('display_phone_number')
1✔
118
                    for status_data in statuses:
1✔
119
                        recipient = status_data.get('recipient_id')
1✔
120
                        ChatDataProcessor.save_whatsapp_audit_log(status_data, bot, user, recipient,
1✔
121
                                                                  ChannelTypes.WHATSAPP.value)
122
                        if status_data.get('type') == "payment":
1✔
123
                            status_data["from"] = user
1✔
124
                            await self.message(status_data, metadata, bot)
1✔
125
                for message in messages or []:
1✔
126
                    await self.message(message, metadata, bot)
1✔
127

128
    async def send_message_to_user(self, message: Any, recipient_id: str):
1✔
129
        """Send a message to the user."""
130
        from kairon.chat.converters.channels.response_factory import ConverterFactory
1✔
131

132
        is_bps = self.config.get("bsp_type", "meta") == "360dialog"
1✔
133
        client = WhatsappFactory.get_client(self.config.get("bsp_type", "meta"))
1✔
134
        phone_number_id = self.config.get('phone_number_id')
1✔
135
        if not phone_number_id and not is_bps:
1✔
136
            raise ValueError("Phone number not found in channel config")
×
137
        access_token = self.__get_access_token()
1✔
138
        c = client(access_token, from_phone_number_id=phone_number_id)
1✔
139
        message_type = "text"
1✔
140
        if isinstance(message, str):
1✔
141
            message = {
1✔
142
                'body': message,
143
                'preview_url': True
144
            }
145
            c.send(message, recipient_id, message_type)
1✔
146
        else:
147
            content_type = {"link": "text", "video": "video", "image": "image", "button": "interactive",
1✔
148
                            "dropdown": "interactive", "audio": "audio"}
149
            if isinstance(message, dict):
1✔
150
                message = [message]
1✔
151
            for item in message:
1✔
152
                message_type = content_type.get(item.get('type'))
1✔
153
                message_body = item.get('data')
1✔
154
                if not message_type:
1✔
155
                    c.send({'body': f"{message_body}", 'preview_url': True}, recipient_id, "text")
×
156
                else:
157
                    converter_instance = ConverterFactory.getConcreteInstance(item.get('type'), ChannelTypes.WHATSAPP.value)
1✔
158
                    response = await converter_instance.messageConverter(message_body)
1✔
159
                    c.send(response, recipient_id, message_type)
1✔
160

161
    async def handle_payload(self, request, metadata: Optional[Dict[Text, Any]], bot: str) -> str:
1✔
162
        msg = "success"
1✔
163
        payload = await request.json()
1✔
164
        request_bytes = await request.body()
1✔
165
        provider = self.config.get("bsp_type", "meta")
1✔
166
        metadata.update({"channel_type": ChannelTypes.WHATSAPP.value, "bsp_type": provider, "tabname": "default"})
1✔
167
        signature = request.headers.get("X-Hub-Signature") or ""
1✔
168
        if provider == "meta":
1✔
169
            if not MessengerHandler.validate_hub_signature(self.config["app_secret"], request_bytes, signature):
1✔
170
                logger.warning("Wrong app secret secret! Make sure this matches the secret in your whatsapp app settings.")
1✔
171
                msg = "not validated"
1✔
172
                return msg
1✔
173

174
        actor = ActorFactory.get_instance(ActorType.callable_runner.value)
1✔
175
        actor.execute(self.handle_meta_payload, payload, metadata, bot)
1✔
176
        return msg
1✔
177

178
    def get_business_phone_number_id(self) -> Text:
1✔
179
        return self.last_message.get("value", {}).get("metadata", {}).get("phone_number_id", "")
1✔
180

181
    async def _handle_user_message(
1✔
182
            self, text: Text, sender_id: Text, metadata: Optional[Dict[Text, Any]], bot: str, media_ids: list[str] = None
183
    ) -> None:
184
        """Pass on the text to the dialogue engine for processing."""
185
        out_channel = WhatsappBot(self.client)
1✔
186
        self.client.metadata = metadata
1✔
187
        await out_channel.mark_as_read(metadata["id"])
1✔
188
        user_msg = UserMessage(
1✔
189
            text, out_channel, sender_id, input_channel=self.name(), metadata=metadata
190
        )
191
        try:
1✔
192
            await self.process_message(bot, user_msg, media_ids)
1✔
193
        except Exception as e:
×
194
            logger.exception("Exception when trying to handle webhook for whatsapp message.")
×
195
            logger.exception(e)
×
196

197
    @staticmethod
1✔
198
    async def process_message(bot: str, user_message: UserMessage, media_ids: list[str] = None):
1✔
199
        await AgentProcessor.handle_channel_message(bot, user_message, media_ids=media_ids)
1✔
200

201
    def __get_access_token(self):
1✔
202
        provider = self.config.get("bsp_type", "meta")
1✔
203
        if provider == "meta":
1✔
204
            return self.config.get('access_token')
1✔
205
        else:
206
            return self.config.get('api_key')
1✔
207

208

209
class WhatsappBot(OutputChannel):
1✔
210
    """A bot that uses whatsapp to communicate."""
211

212
    @classmethod
1✔
213
    def name(cls) -> Text:
1✔
214
        return ChannelTypes.WHATSAPP.value
1✔
215

216
    def __init__(self, whatsapp_client: WhatsappCloud) -> None:
1✔
217
        """Init whatsapp output channel."""
218
        self.whatsapp_client = whatsapp_client
1✔
219
        super().__init__()
1✔
220

221
    def send(self, recipient_id: Text, element: Any) -> None:
1✔
222
        """Sends a message to the recipient using the messenger client."""
223

224
        # this is a bit hacky, but the client doesn't have a proper API to
225
        # send messages but instead expects the incoming sender to be present
226
        # which we don't have as it is stored in the input channel.
227
        self.whatsapp_client.send(element, recipient_id, "text")
1✔
228

229
    async def send_text_message(
1✔
230
            self, recipient_id: Text, text: Text, **kwargs: Any
231
    ) -> None:
232
        """Send a message through this channel."""
233

234
        self.send(recipient_id, {"preview_url": True, "body": text})
1✔
235

236
    async def send_image_url(
1✔
237
            self, recipient_id: Text, image: Text, **kwargs: Any
238
    ) -> None:
239
        """Sends an image. Default will just post the url as a string."""
240
        link = kwargs.get("link")
×
241
        self.send(recipient_id, {"link": link})
×
242

243
    async def mark_as_read(self, msg_id: Text) -> None:
1✔
244
        """Mark user message as read.
245
        Args:
246
            msg_id: message id
247
        """
248
        self.whatsapp_client.mark_as_read(msg_id)
1✔
249

250
    async def send_custom_json(
1✔
251
            self,
252
            recipient_id: Text,
253
            json_message: Union[List, Dict[Text, Any]],
254
            **kwargs: Any,
255
    ) -> None:
256
        """Sends custom json data to the output."""
257
        type_list = Utility.system_metadata.get("type_list")
1✔
258
        message = json_message.get("data")
1✔
259
        messagetype = json_message.get("type")
1✔
260
        content_type = {"link": "text", "video": "video", "image": "image", "button": "interactive",
1✔
261
                        "dropdown": "interactive", "audio": "audio", "formatText": "text"}
262
        if messagetype is not None and messagetype in type_list:
1✔
263
            messaging_type = content_type.get(messagetype)
1✔
264
            from kairon.chat.converters.channels.response_factory import ConverterFactory
1✔
265
            converter_instance = ConverterFactory.getConcreteInstance(messagetype, ChannelTypes.WHATSAPP.value)
1✔
266
            response = await converter_instance.messageConverter(message)
1✔
267
            resp = self.whatsapp_client.send(response, recipient_id, messaging_type)
1✔
268

269
            if resp.get("error"):
1✔
270
                bot = kwargs.get("assistant_id")
1✔
271
                message_id = self.whatsapp_client.metadata.get("id")
1✔
272
                user = self.whatsapp_client.metadata.get("display_phone_number")
1✔
273
                if not bot:
1✔
274
                    logger.error("Missing assistant_id in kwargs for failed message logging")
×
275
                    return
×
276
                logger.error(f"WhatsApp message failed: {resp.get('error')}")
1✔
277
                try:
1✔
278
                    ChatDataProcessor.save_whatsapp_failed_messages(
1✔
279
                        resp, bot, recipient_id, ChannelTypes.WHATSAPP.value,
280
                        json_message=json_message, message_id=message_id, user=user,
281
                        metadata=self.whatsapp_client.metadata
282
                    )
283
                except Exception as e:
×
284
                    logger.error(f"Failed to log WhatsApp error: {str(e)}")
×
285
        else:
286
            self.send(recipient_id, {"preview_url": True, "body": str(json_message)})
×
287

288

289
class WhatsappHandler(MessengerHandler):
1✔
290
    """Whatsapp input channel implementation. Based on the HTTPInputChannel."""
291

292
    def __init__(self, bot: Text, user: User, request: Request):
1✔
293
        super().__init__(bot, user, request)
1✔
294
        self.bot = bot
1✔
295
        self.user = user
1✔
296
        self.request = request
1✔
297

298
    async def validate(self):
1✔
299
        messenger_conf = ChatDataProcessor.get_channel_config(ChannelTypes.WHATSAPP.value, self.bot, mask_characters=False)
×
300

301
        verify_token = messenger_conf["config"]["verify_token"]
×
302

303
        if self.request.query_params.get("hub.verify_token") == verify_token:
×
304
            hub_challenge = self.request.query_params.get("hub.challenge")
×
305
            return int(hub_challenge)
×
306
        else:
307
            logger.warning("Invalid verify token! Make sure this matches your webhook settings on the whatsapp app.")
×
308
            return {"status": "failure, invalid verify_token"}
×
309

310
    async def handle_message(self):
1✔
311
        channel_conf = ChatDataProcessor.get_channel_config(ChannelTypes.WHATSAPP.value, self.bot, mask_characters=False)
1✔
312
        whatsapp_channel = Whatsapp(channel_conf["config"])
1✔
313
        metadata = self.get_metadata(self.request) or {}
1✔
314
        metadata.update({"is_integration_user": True, "bot": self.bot, "account": self.user.account})
1✔
315
        msg = await whatsapp_channel.handle_payload(self.request, metadata, self.bot)
1✔
316
        return msg
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc