• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 12112350604

02 Dec 2024 03:52AM UTC coverage: 89.891% (-0.04%) from 89.932%
12112350604

Pull #1611

github

web-flow
Merge 9176d03d1 into f2f296b80
Pull Request #1611: Mail channel implementation

383 of 434 new or added lines in 15 files covered. (88.25%)

12 existing lines in 2 files now uncovered.

24141 of 26856 relevant lines covered (89.89%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.25
/kairon/chat/utils.py
1
import datetime
1✔
2
import os
1✔
3
from typing import Text, Dict
1✔
4

5
from loguru import logger
1✔
6
from pymongo.collection import Collection
1✔
7
from pymongo.errors import ServerSelectionTimeoutError
1✔
8
from rasa.core.channels import UserMessage
1✔
9
from rasa.core.tracker_store import SerializedTrackerAsDict
1✔
10

11
from .agent_processor import AgentProcessor
1✔
12
from .. import Utility
1✔
13
from ..live_agent.factory import LiveAgentFactory
1✔
14
from ..shared.account.activity_log import UserActivityLogger
1✔
15
from ..shared.actions.utils import ActionUtility
1✔
16
from ..shared.constants import UserActivityType
1✔
17
from ..shared.live_agent.processor import LiveAgentsProcessor
1✔
18
from ..shared.metering.constants import MetricType
1✔
19
from ..shared.metering.metering_processor import MeteringProcessor
1✔
20

21

22
class ChatUtils:
1✔
23
    @staticmethod
1✔
24
    async def chat(
1✔
25
        data: Text,
26
        account: int,
27
        bot: Text,
28
        user: Text,
29
        is_integration_user: bool = False,
30
        metadata: Dict = None,
31
    ):
32
        model = AgentProcessor.get_agent(bot)
1✔
33
        metadata = ChatUtils.get_metadata(account, bot, is_integration_user, metadata)
1✔
34
        msg = UserMessage(data, sender_id=user, metadata=metadata)
1✔
35
        chat_response = await AgentProcessor.handle_channel_message(bot, msg)
1✔
36
        if not chat_response:
1✔
37
            return {
×
38
                "success": True,
39
                "message": "user message delivered to live agent."
40
            }
41
        await ChatUtils.__attach_agent_handoff_metadata(
1✔
42
            account, bot, user, chat_response, model.tracker_store
43
        )
44
        return chat_response
1✔
45

46
    @staticmethod
1✔
47
    async  def process_messages_via_bot(
1✔
48
            messages: [str],
49
            account: int,
50
            bot: str,
51
            user: str,
52
            is_integration_user: bool = False,
53
            metadata: Dict = None,
54
    ):
NEW
55
        responses = []
×
NEW
56
        uncached_model = AgentProcessor.get_agent_without_cache(bot, False)
×
NEW
57
        metadata = ChatUtils.get_metadata(account, bot, is_integration_user, metadata)
×
NEW
58
        for message in messages:
×
NEW
59
            msg = UserMessage(message, sender_id=user, metadata=metadata)
×
NEW
60
            chat_response = await uncached_model.handle_message(msg)
×
NEW
61
            responses.append(chat_response)
×
NEW
62
        return responses
×
63

64

65
    @staticmethod
1✔
66
    def reload(bot: Text, user: Text):
1✔
67
        exc = None
1✔
68
        status = "Success"
1✔
69
        try:
1✔
70
            AgentProcessor.reload(bot)
1✔
71
        except Exception as e:
1✔
72
            logger.error(e)
1✔
73
            exc = str(e)
1✔
74
            status = "Failed"
1✔
75
        finally:
76
            UserActivityLogger.add_log(
1✔
77
                a_type=UserActivityType.model_reload.value,
78
                email=user,
79
                bot=bot,
80
                data={
81
                    "username": user,
82
                    "process_id": os.getpid(),
83
                    "exception": exc,
84
                    "status": status,
85
                },
86
            )
87

88
    @staticmethod
1✔
89
    async def __attach_agent_handoff_metadata(
1✔
90
        account: int, bot: Text, sender_id: Text, bot_predictions, tracker
91
    ):
92
        metadata = {"initiate": False, "type": None, "additional_properties": None}
1✔
93
        exception = None
1✔
94
        should_initiate_handoff = False
1✔
95
        try:
1✔
96
            config = LiveAgentsProcessor.get_config(
1✔
97
                bot, mask_characters=False, raise_error=False
98
            )
99
            if config:
1✔
100
                metadata["type"] = config["agent_type"]
1✔
101
                should_initiate_handoff = ChatUtils.__should_initiate_handoff(
1✔
102
                    bot_predictions, config
103
                )
104
                if should_initiate_handoff:
1✔
105
                    metadata["initiate"] = True
1✔
106
                    live_agent = LiveAgentFactory.get_agent(
1✔
107
                        config["agent_type"], config["config"]
108
                    )
109
                    metadata["additional_properties"] = live_agent.initiate_handoff(
1✔
110
                        bot, sender_id
111
                    )
112
                    businessdata = live_agent.getBusinesshours(
1✔
113
                        config, metadata["additional_properties"]["inbox_id"]
114
                    )
115
                    if businessdata is not None and businessdata.get(
1✔
116
                        "working_hours_enabled"
117
                    ):
118
                        is_business_hours_enabled = businessdata.get(
1✔
119
                            "working_hours_enabled"
120
                        )
121
                        if is_business_hours_enabled:
1✔
122
                            current_utcnow = datetime.datetime.utcnow()
1✔
123
                            workingstatus = live_agent.validate_businessworkinghours(
1✔
124
                                businessdata, current_utcnow
125
                            )
126
                            if not workingstatus:
1✔
127
                                metadata.update(
1✔
128
                                    {
129
                                        "businessworking": businessdata[
130
                                            "out_of_office_message"
131
                                        ]
132
                                    }
133
                                )
134
                                metadata["initiate"] = False
1✔
135
                                bot_predictions["agent_handoff"] = metadata
1✔
136
                                should_initiate_handoff = False
1✔
137
                                return metadata
1✔
138
                    message_trail = await ChatUtils.__retrieve_conversation(
1✔
139
                        tracker, sender_id
140
                    )
141
                    live_agent.send_conversation_log(
1✔
142
                        message_trail, metadata["additional_properties"]["destination"]
143
                    )
144
        except Exception as e:
1✔
145
            logger.exception(e)
1✔
146
            exception = str(e)
1✔
147
            metadata["initiate"] = False
1✔
148
        finally:
149
            if not Utility.check_empty_string(exception) or should_initiate_handoff:
1✔
150
                MeteringProcessor.add_metrics(
1✔
151
                    bot,
152
                    account,
153
                    MetricType.agent_handoff,
154
                    sender_id=sender_id,
155
                    agent_type=metadata.get("type"),
156
                    bot_predictions=bot_predictions,
157
                    exception=exception,
158
                )
159

160
        bot_predictions["agent_handoff"] = metadata
1✔
161
        return metadata
1✔
162

163
    @staticmethod
1✔
164
    async def __retrieve_conversation(tracker, sender_id: Text):
1✔
165
        events = SerializedTrackerAsDict.serialise_tracker(
1✔
166
            await tracker.retrieve(sender_id)
167
        )
168
        _, message_trail = ActionUtility.prepare_message_trail(events.get("events"))
1✔
169
        return message_trail
1✔
170

171
    @staticmethod
1✔
172
    def __should_initiate_handoff(bot_predictions, agent_handoff_config):
1✔
173
        predicted_intent = bot_predictions["nlu"]["intent"]["name"]
1✔
174
        predicted_action = [
1✔
175
            action.get("action_name") for action in bot_predictions["action"]
176
        ]
177
        trigger_on_intent = predicted_intent in set(
1✔
178
            agent_handoff_config.get("trigger_on_intents", [])
179
        )
180
        trigger_on_action = (
1✔
181
            len(
182
                set(predicted_action).intersection(
183
                    set(agent_handoff_config.get("trigger_on_actions", []))
184
                )
185
            )
186
            > 0
187
        )
188
        return (
1✔
189
            agent_handoff_config["override_bot"]
190
            or trigger_on_intent
191
            or trigger_on_action
192
        )
193

194
    @staticmethod
1✔
195
    def get_last_session_conversation(bot: Text, sender_id: Text):
1✔
196

197
        """
198
        List conversation events in last session.
199

200
        :param bot: bot id
201
        :param sender_id: user id
202
        :return: list of conversation events
203
        """
204

205
        events = []
1✔
206
        message = None
1✔
207

208
        try:
1✔
209
            config = Utility.get_local_db()
1✔
210
            client = Utility.create_mongo_client(config)
1✔
211
            with client as client:
1✔
212
                db = client.get_database(config['db'])
1✔
213
                conversations = db.get_collection(bot)
1✔
214
                logger.debug(
1✔
215
                    f"Loading host: {config['host']}, db:{db.name}, collection: {bot},env: {Utility.environment['env']}"
216
                )
217
                last_session = ChatUtils.get_last_session(conversations, sender_id)
1✔
218
                print(last_session)
1✔
219
                logger.debug(f"last session: {last_session}")
1✔
220
                if not last_session:
1✔
221
                    return events, message
1✔
222
                events = list(
1✔
223
                    conversations.aggregate(
224
                        [
225
                            {
226
                                "$match": {
227
                                    "sender_id": sender_id,
228
                                    "timestamp": {
229
                                        "$gt": last_session["event"]["timestamp"]
230
                                    },
231
                                    "type": {
232
                                        "$in": ["flattened"]
233
                                    },
234
                                }
235
                            },
236
                            {
237
                                "$addFields": {"_id": {"$toString": "$_id"}}
238
                            },
239
                            {"$sort": {"timestamp": 1}},
240
                            {
241
                                "$group": {
242
                                  "_id": "$metadata.tabname",
243
                                  "events": {"$push": "$$ROOT"}
244
                                }
245
                            },
246
                            {
247
                                "$project": {
248
                                    "_id": 0,
249
                                    "tabname": "$_id",
250
                                    "events": "$events",
251
                                }
252
                            },
253
                        ]
254
                    )
255
                )
256
        except ServerSelectionTimeoutError as e:
1✔
257
            logger.info(e)
1✔
258
            message = f"Failed to retrieve conversation: {e}"
1✔
259
        except Exception as e:
1✔
260
            logger.info(e)
1✔
261
            message = f"Failed to retrieve conversation: {e}"
1✔
262
        return events, message
1✔
263

264
    @staticmethod
1✔
265
    def get_last_session(conversations: Collection, sender_id: Text):
1✔
266
        last_session = list(
1✔
267
            conversations.aggregate(
268
                [
269
                    {
270
                        "$match": {
271
                            "sender_id": sender_id,
272
                            "event.event": "session_started",
273
                        }
274
                    },
275
                    {"$sort": {"event.timestamp": 1}},
276
                    {"$group": {"_id": "$sender_id", "event": {"$last": "$event"}}},
277
                ]
278
            )
279
        )
280
        return last_session[0] if last_session else None
1✔
281

282
    @staticmethod
1✔
283
    def get_metadata(
1✔
284
        account: int,
285
        bot: Text,
286
        is_integration_user: bool = False,
287
        metadata: Dict = None,
288
    ):
289
        default_metadata = {
1✔
290
            "is_integration_user": is_integration_user,
291
            "bot": bot,
292
            "account": account,
293
            "channel_type": "chat_client",
294
        }
295
        if not metadata:
1✔
296
            metadata = {}
1✔
297
        if not metadata.get("tabname"):
1✔
298
            metadata["tabname"] = "default"
1✔
299
        metadata.update(default_metadata)
1✔
300
        return metadata
1✔
301

302

303

304

305

306

307

308
    @staticmethod
1✔
309
    def add_telemetry_metadata(x_telemetry_uid: Text, x_telemetry_sid: Text, metadata: Dict = None):
1✔
310
        if not metadata:
1✔
311
            metadata = {}
1✔
312
        if x_telemetry_uid and x_telemetry_sid:
1✔
313
            #TODO: validate x_telemetry_uid and x_session_id for the botid
314
            metadata["telemetry-uid"] = x_telemetry_uid
×
315
            metadata["telemetry-sid"] = x_telemetry_sid
×
316
        return metadata
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc