• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

digiteinfotech / kairon / 12112350604

02 Dec 2024 03:52AM UTC coverage: 89.891% (-0.04%) from 89.932%
12112350604

Pull #1611

github

web-flow
Merge 9176d03d1 into f2f296b80
Pull Request #1611: Mail channel implementation

383 of 434 new or added lines in 15 files covered. (88.25%)

12 existing lines in 2 files now uncovered.

24141 of 26856 relevant lines covered (89.89%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.36
/kairon/shared/channels/mail/scheduler.py
1
import asyncio
1✔
2
from datetime import datetime, timedelta
1✔
3
from urllib.parse import urljoin
1✔
4

5
from apscheduler.jobstores.mongodb import MongoDBJobStore
1✔
6
from apscheduler.schedulers.background import BackgroundScheduler
1✔
7
from pymongo import MongoClient
1✔
8

9
from kairon import Utility
1✔
10
from kairon.events.definitions.mail_channel_schedule import MailChannelScheduleEvent
1✔
11
from kairon.exceptions import AppException
1✔
12
from kairon.shared.channels.mail.processor import MailProcessor
1✔
13
from kairon.shared.chat.data_objects import Channels
1✔
14
from kairon.shared.constants import ChannelTypes
1✔
15
from loguru import logger
1✔
16

17

18
class MailScheduler:
1✔
19
    scheduler = None
1✔
20
    scheduled_bots = set()
1✔
21

22
    @staticmethod
1✔
23
    def epoch():
1✔
24
        is_initialized = False
1✔
25
        if not MailScheduler.scheduler:
1✔
26
            is_initialized = True
1✔
27
            client = MongoClient(Utility.environment['database']['url'])
1✔
28
            events_db = Utility.environment['events']['queue']['mail_queue_name']
1✔
29
            job_store_name = Utility.environment['events']['scheduler']['mail_scheduler_collection']
1✔
30

31
            MailScheduler.scheduler = BackgroundScheduler(
1✔
32
                jobstores={job_store_name: MongoDBJobStore(events_db, job_store_name, client)},
33
                job_defaults={'coalesce': True, 'misfire_grace_time': 7200})
34

35
        bots = Channels.objects(connector_type= ChannelTypes.MAIL)
1✔
36
        bots_list = [bot['bot'] for bot in bots]
1✔
37
        bots = set(bots_list)
1✔
38

39
        unscheduled_bots = bots - MailScheduler.scheduled_bots
1✔
40
        for bot in unscheduled_bots:
1✔
41
            first_schedule_time = datetime.now() + timedelta(seconds=5)
1✔
42
            MailScheduler.scheduler.add_job(MailScheduler.process_mails,
1✔
43
                                            'date', args=[bot, MailScheduler.scheduler], run_date=first_schedule_time)
44
            MailScheduler.scheduled_bots.add(bot)
1✔
45

46
        MailScheduler.scheduled_bots = MailScheduler.scheduled_bots.intersection(bots)
1✔
47
        if is_initialized:
1✔
48
            MailScheduler.scheduler.start()
1✔
49
            return True
1✔
50
        return False
1✔
51

52
    @staticmethod
1✔
53
    def request_epoch():
1✔
54
        event_server_url = Utility.get_event_server_url()
1✔
55
        resp = Utility.execute_http_request(
1✔
56
            "GET",
57
            urljoin(
58
                event_server_url,
59
                "/api/mail/request_epoch",
60
            ),
61
            err_msg=f"Failed to request epoch",
62
        )
63
        if not resp['success']:
1✔
64
            raise AppException("Failed to request email channel epoch")
1✔
65

66
    @staticmethod
1✔
67
    def process_mails(bot, scheduler: BackgroundScheduler = None):
1✔
68

69
        if bot not in MailScheduler.scheduled_bots:
1✔
NEW
70
            return
×
71
        logger.info(f"MailScheduler: Processing mails for bot {bot}")
1✔
72
        next_timestamp = MailScheduler.read_mailbox_and_schedule_events(bot)
1✔
73
        MailScheduler.scheduler.add_job(MailScheduler.process_mails, 'date', args=[bot, scheduler], run_date=next_timestamp)
1✔
74
        MailScheduler.epoch()
1✔
75

76
    @staticmethod
1✔
77
    def read_mailbox_and_schedule_events(bot) -> datetime:
1✔
78
        vals = MailProcessor.read_mails(bot)
1✔
79
        print(vals)
1✔
80
        emails, user, next_delay = vals
1✔
81
        for email in emails:
1✔
82
            MailChannelScheduleEvent(bot, user).enqueue(mails=[email])
1✔
83
        next_timestamp = datetime.now() + timedelta(seconds=next_delay)
1✔
84
        return next_timestamp
1✔
85

86

87

88

89

90

91

92

93

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc