• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 12112350604

02 Dec 2024 03:52AM UTC coverage: 89.891% (-0.04%) from 89.932%
12112350604

Pull #1611

github

web-flow
Merge 9176d03d1 into f2f296b80
Pull Request #1611: Mail channel implementation

383 of 434 new or added lines in 15 files covered. (88.25%)

12 existing lines in 2 files now uncovered.

24141 of 26856 relevant lines covered (89.89%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.56
/kairon/shared/channels/mail/processor.py
1
import asyncio
1✔
2
import re
1✔
3

4
from apscheduler.schedulers.background import BackgroundScheduler
1✔
5
from loguru import logger
1✔
6
from pydantic.schema import timedelta
1✔
7
from pydantic.validators import datetime
1✔
8
from imap_tools import MailBox, AND
1✔
9
from kairon.exceptions import AppException
1✔
10
from kairon.shared.account.data_objects import Bot
1✔
11
from kairon.shared.channels.mail.constants import MailConstants
1✔
12
from kairon.shared.channels.mail.data_objects import MailClassificationConfig
1✔
13
from kairon.shared.chat.processor import ChatDataProcessor
1✔
14
from kairon.shared.constants import ChannelTypes
1✔
15
from kairon.shared.data.data_objects import BotSettings
1✔
16
from kairon.shared.llm.processor import LLMProcessor
1✔
17
import json
1✔
18
from email.mime.text import MIMEText
1✔
19
from email.mime.multipart import MIMEMultipart
1✔
20
import smtplib
1✔
21

22

23

24
class MailProcessor:
1✔
25
    def __init__(self, bot):
1✔
26
        self.config = ChatDataProcessor.get_channel_config(ChannelTypes.MAIL, bot, False)['config']
1✔
27
        self.llm_type = self.config.get('llm_type', "openai")
1✔
28
        self.hyperparameters = self.config.get('hyperparameters', MailConstants.DEFAULT_HYPERPARAMETERS)
1✔
29
        self.bot = bot
1✔
30
        bot_info = Bot.objects.get(id=bot)
1✔
31
        self.account = bot_info.account
1✔
32
        self.llm_processor = LLMProcessor(self.bot, self.llm_type)
1✔
33
        self.mail_configs = list(MailClassificationConfig.objects(bot=self.bot))
1✔
34
        self.mail_configs_dict = {item.intent: item for item in self.mail_configs}
1✔
35
        self.bot_settings = BotSettings.objects(bot=self.bot).get()
1✔
36
        self.mailbox = None
1✔
37
        self.smtp = None
1✔
38

39

40
    def login_imap(self):
1✔
41
        if self.mailbox:
1✔
NEW
42
            return
×
43
        email_account = self.config['email_account']
1✔
44
        email_password = self.config['email_password']
1✔
45
        imap_server = self.config.get('imap_server', MailConstants.DEFAULT_IMAP_SERVER)
1✔
46
        self.mailbox = MailBox(imap_server).login(email_account, email_password)
1✔
47

48
    def logout_imap(self):
1✔
49
        if self.mailbox:
1✔
50
            self.mailbox.logout()
1✔
51
            self.mailbox = None
1✔
52

53
    def login_smtp(self):
1✔
54
        if self.smtp:
1✔
NEW
55
            return
×
56
        email_account = self.config['email_account']
1✔
57
        email_password = self.config['email_password']
1✔
58
        smtp_server = self.config.get('smtp_server', MailConstants.DEFAULT_SMTP_SERVER)
1✔
59
        smtp_port = self.config.get('smtp_port', MailConstants.DEFAULT_SMTP_PORT)
1✔
60
        smtp_port = int(smtp_port)
1✔
61
        self.smtp = smtplib.SMTP(smtp_server, smtp_port, timeout=30)
1✔
62
        self.smtp.starttls()
1✔
63
        self.smtp.login(email_account, email_password)
1✔
64

65
    def logout_smtp(self):
1✔
66
        if self.smtp:
1✔
67
            self.smtp.quit()
1✔
68
            self.smtp = None
1✔
69

70
    async def send_mail(self, to: str, subject: str, body: str):
1✔
71
        try:
1✔
72
            email_account = self.config['email_account']
1✔
73
            msg = MIMEMultipart()
1✔
74
            msg['From'] = email_account
1✔
75
            msg['To'] = to
1✔
76
            msg['Subject'] = subject
1✔
77
            msg.attach(MIMEText(body, 'html'))
1✔
78
            self.smtp.sendmail(email_account, to, msg.as_string())
1✔
NEW
79
        except Exception as e:
×
NEW
80
            logger.error(f"Error sending mail to {to}: {str(e)}")
×
81

82
    def process_mail(self, intent: str, rasa_chat_response: dict):
1✔
83
        slots = rasa_chat_response.get('slots', [])
1✔
84
        slots = {key.strip(): value.strip() for slot_str in slots
1✔
85
                    for split_result in [slot_str.split(":", 1)]
86
                    if len(split_result) == 2
87
                    for key, value in [split_result]}
88

89

90
        responses = '<br/><br/>'.join(response.get('text', '') for response in rasa_chat_response.get('response', []))
1✔
91
        slots['bot_response'] = responses
1✔
92
        mail_template = self.mail_configs_dict.get(intent, None)
1✔
93
        if mail_template and mail_template.reply_template:
1✔
94
            mail_template = mail_template.reply_template
1✔
95
        else:
96
            mail_template = MailConstants.DEFAULT_TEMPLATE
1✔
97

98
        return mail_template.format(**{key: str(value) for key, value in slots.items()})
1✔
99

100
    async def classify_messages(self, messages: [dict]) -> [dict]:
1✔
101
        if self.bot_settings.llm_settings['enable_faq']:
1✔
102
            try:
1✔
103
                system_prompt = self.config.get('system_prompt', MailConstants.DEFAULT_SYSTEM_PROMPT)
1✔
104
                system_prompt += '\n return json format: [{"intent": "intent_name", "entities": {"entity_name": "value"}, "mail_id": "mail_id", "subject": "subject"}], if not classifiable set intent and not-found entity values as null'
1✔
105
                context_prompt = self.get_context_prompt()
1✔
106
                messages = json.dumps(messages)
1✔
107
                info = await self.llm_processor.predict(messages,
1✔
108
                                                        self.bot_settings.user,
109
                                                        system_prompt=system_prompt,
110
                                                        context_prompt=context_prompt,
111
                                                        similarity_prompt=[],
112
                                                        hyperparameters=self.hyperparameters)
113
                classifications = MailProcessor.extract_jsons_from_text(info["content"])[0]
1✔
114
                return classifications
1✔
NEW
115
            except Exception as e:
×
NEW
116
                logger.error(str(e))
×
NEW
117
                raise AppException(str(e))
×
118

119

120
    @staticmethod
1✔
121
    async def process_messages(bot: str, batch: [dict]):
1✔
122
        """
123
        classify and respond to a batch of messages
124
        """
125
        try:
1✔
126
            from kairon.chat.utils import ChatUtils
1✔
127
            mp = MailProcessor(bot)
1✔
128
            classifications = await mp.classify_messages(batch)
1✔
129
            user_messages: [str] = []
1✔
130
            responses = []
1✔
131
            intents = []
1✔
132
            for classification in classifications:
1✔
133
                try:
1✔
134
                    intent = classification['intent']
1✔
135
                    if not intent or intent == 'null':
1✔
NEW
136
                        continue
×
137
                    entities = classification['entities']
1✔
138
                    sender_id = classification['mail_id']
1✔
139
                    subject = f"{classification['subject']}"
1✔
140

141
                    # mail_id is in the format "name <email>" or "email"
142
                    if '<' in sender_id:
1✔
NEW
143
                        sender_id = sender_id.split('<')[1].split('>')[0]
×
144

145
                    entities_str = ', '.join([f'"{key}": "{value}"' for key, value in entities.items() if value and value != 'null'])
1✔
146
                    user_msg = f'/{intent}{{{entities_str}}}'
1✔
147
                    logger.info(user_msg)
1✔
148

149
                    user_messages.append(user_msg)
1✔
150
                    responses.append({
1✔
151
                        'to': sender_id,
152
                        'subject': subject,
153
                    })
154
                    intents.append(intent)
1✔
NEW
155
                except Exception as e:
×
NEW
156
                    logger.exception(e)
×
157
            logger.info(responses)
1✔
158

159
            chat_responses = await ChatUtils.process_messages_via_bot(user_messages,
1✔
160
                                                                mp.account,
161
                                                                bot,
162
                                                                mp.bot_settings.user,
163
                                                                False,
164
                                                                {
165
                                                                    'channel': ChannelTypes.MAIL.value
166
                                                                })
167
            logger.info(chat_responses)
1✔
168

169
            for index, response in enumerate(chat_responses):
1✔
170
                responses[index]['body'] = mp.process_mail(intents[index], response)
1✔
171

172
            mp.login_smtp()
1✔
173
            tasks = [mp.send_mail(**response) for response in responses]
1✔
174
            await asyncio.gather(*tasks)
1✔
175
            mp.logout_smtp()
1✔
176

177
        except Exception as e:
1✔
178
            raise AppException(str(e))
1✔
179

180
    def get_context_prompt(self) -> str:
1✔
181
        context_prompt = ""
1✔
182
        for item in self.mail_configs:
1✔
183
            context_prompt += f"intent: {item['intent']} \n"
1✔
184
            context_prompt += f"entities: {item['entities']} \n"
1✔
185
            context_prompt += "\nclassification criteria: \n"
1✔
186
            context_prompt += f"subjects: {item['subjects']} \n"
1✔
187
            context_prompt += f"rule: {item['classification_prompt']} \n"
1✔
188
            context_prompt += "\n\n"
1✔
189
        return context_prompt
1✔
190

191

192
    @staticmethod
1✔
193
    def process_message_task(bot: str, message_batch: [dict]):
1✔
194
        """
195
        Process a batch of messages
196
        used for execution by executor
197
        """
NEW
198
        asyncio.run(MailProcessor.process_messages(bot, message_batch))
×
199

200

201
    @staticmethod
1✔
202
    def read_mails(bot: str) -> ([dict], str, int):
1✔
203
        """
204
        Read mails from the mailbox
205
        Parameters:
206
        - bot: str - bot id
207
        Returns:
208
        - list of messages - each message is a dict with the following
209
            - mail_id
210
            - subject
211
            - date
212
            - body
213
        - user
214
        - time_shift
215

216
        """
217
        mp = MailProcessor(bot)
1✔
218
        time_shift = int(mp.config.get('interval', 20 * 60))
1✔
219
        last_read_timestamp = datetime.now() - timedelta(seconds=time_shift)
1✔
220
        messages = []
1✔
221
        is_logged_in = False
1✔
222
        try:
1✔
223
            mp.login_imap()
1✔
224
            is_logged_in = True
1✔
225
            msgs = mp.mailbox.fetch(AND(seen=False, date_gte=last_read_timestamp.date()))
1✔
226
            for msg in msgs:
1✔
227
                subject = msg.subject
1✔
228
                sender_id = msg.from_
1✔
229
                date = msg.date
1✔
230
                body = msg.text or msg.html or ""
1✔
231
                logger.info(subject, sender_id, date)
1✔
232
                message_entry = {
1✔
233
                    'mail_id': sender_id,
234
                    'subject': subject,
235
                    'date': str(date),
236
                    'body': body
237
                }
238
                messages.append(message_entry)
1✔
239
            mp.logout_imap()
1✔
240
            is_logged_in = False
1✔
241
            return messages, mp.bot_settings.user, time_shift
1✔
NEW
242
        except Exception as e:
×
NEW
243
            logger.exception(e)
×
NEW
244
            if is_logged_in:
×
NEW
245
                mp.logout_imap()
×
NEW
246
            return [], mp.bot_settings.user, time_shift
×
247

248
    @staticmethod
1✔
249
    def extract_jsons_from_text(text) -> list:
1✔
250
        """
251
        Extract json objects from text as a list
252
        """
253
        json_pattern = re.compile(r'(\{.*?\}|\[.*?\])', re.DOTALL)
1✔
254
        jsons = []
1✔
255
        for match in json_pattern.findall(text):
1✔
256
            try:
1✔
257
                json_obj = json.loads(match)
1✔
258
                jsons.append(json_obj)
1✔
NEW
259
            except json.JSONDecodeError:
×
NEW
260
                continue
×
261
        return jsons
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc