• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 17710149774

14 Sep 2025 10:49AM UTC coverage: 71.376% (-3.1%) from 74.434%
17710149774

push

github

mborsetti
Version 3.31.1.post2

1383 of 2314 branches covered (59.77%)

Branch coverage included in aggregate %.

4614 of 6088 relevant lines covered (75.79%)

5.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.98
/webchanges/mailer.py
1
"""Email handler."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import getpass
8✔
9
import logging
8✔
10
import re
8✔
11
import smtplib
8✔
12
import subprocess
8✔
13
from dataclasses import dataclass
8✔
14
from email import policy
8✔
15
from email.message import EmailMessage
8✔
16
from email.utils import formatdate
8✔
17
from pathlib import Path
8✔
18
from types import ModuleType
8✔
19

20
try:
8✔
21
    import keyring
8✔
22
except ImportError as e:  # pragma: no cover
23
    keyring = str(e)  # type: ignore[assignment]
24

25
logger = logging.getLogger(__name__)
8✔
26

27

28
class Mailer:
8✔
29
    """Mailer class."""
30

31
    def send(self, msg: EmailMessage) -> None:
8✔
32
        """Send a message.
33

34
        :param msg: The message to be sent.
35
        :raises NotImplementedError: Use a subclass of EmailMessage to send a message.
36
        """
37
        raise NotImplementedError
38

39
    @staticmethod
8✔
40
    def msg(
8✔
41
        from_email: str, to_email: str, subject: str, text_body: str, html_body: str | None = None, utf_8: bool = True
42
    ) -> EmailMessage:
43
        """Create an Email object for a message.
44

45
        :param from_email: The 'From' email address
46
        :param to_email: The 'To' email address
47
        :param subject: The 'Subject' of the email
48
        :param text_body: The body in text format
49
        :param html_body: The body in html format (optional)
50
        :param utf_8: Whether to format the message using SMTPUTF8 (optional)
51
        """
52

53
        def extract_inline_images(html_body: str) -> tuple[str, dict[str, bytes]]:
8✔
54
            """Extract inline images from the email.
55

56
            :param html_body: The HTML with inline images.
57

58
            :return: The HTML with src tags and a dictionary of cid and file.
59
            """
60
            cid_dict: dict[str, bytes] = {}
×
61
            cid_counter = 1
×
62

63
            def replace_img(match: re.Match) -> str:
×
64
                """Function to replace the matched img tags with src="cid:<...>"> and to add the cid and the image to
65
                the cid_dict object.
66
                """
67
                nonlocal cid_counter
68
                image_format, image_data_b64 = match.groups()
×
69
                image_data = base64.b64decode(image_data_b64)
×
70
                image_cid = f'image{cid_counter}_{image_format.split(";")[0]}'
×
71
                cid_dict[image_cid] = image_data
×
72
                new_img_tag = f'src="cid:{image_cid}"'
×
73
                cid_counter += 1
×
74
                return new_img_tag
×
75

76
            edited_html = re.sub(r'src="data:image/(.+?);base64,(.+?)"', replace_img, html_body)
×
77
            return edited_html, cid_dict
×
78

79
        msg = EmailMessage(policy=policy.SMTPUTF8 if utf_8 else policy.SMTP)
8✔
80
        msg['From'] = from_email
8✔
81
        msg['To'] = to_email
8✔
82
        msg['Subject'] = subject
8✔
83
        msg['Date'] = formatdate(localtime=True)
8✔
84
        msg.set_content(text_body, subtype='plain')
8✔
85
        if html_body is not None:
8!
86
            if ';base64,' not in html_body:
8!
87
                msg.add_alternative(html_body, subtype='html')
8✔
88
            else:
89
                html_body, cid_dict = extract_inline_images(html_body)
×
90
                msg.add_alternative(html_body, subtype='html')
×
91
                payloads: EmailMessage = msg.get_payload()[1]  # type: ignore[assignment,index]
×
92
                for image_cid, image_data in cid_dict.items():
×
93
                    payloads.add_related(
×
94
                        image_data,
95
                        maintype='image',
96
                        subtype=image_cid.split('_')[-1],
97
                        disposition='inline',
98
                        filename=image_cid,
99
                        cid=f'<{image_cid}>',
100
                    )
101
        return msg
8✔
102

103

104
@dataclass
8✔
105
class SMTPMailer(Mailer):
8✔
106
    """The Mailer class for SMTP.
107

108
    :param smtp_user: The username for the SMTP server.
109
    :param smtp_server: The address of the SMTP server.
110
    :param smtp_port: The port of the SMTP server.
111
    :param tls: Whether tls is to be used to connect to the SMTP server.
112
    :param auth: Whether authentication is to be used with the SMTP server.
113
    :param insecure_password: The password for the SMTP server (optional, to be used only if no keyring is present).
114
    """
115

116
    smtp_user: str
8✔
117
    smtp_server: str
8✔
118
    smtp_port: int
8✔
119
    tls: bool
8✔
120
    auth: bool
8✔
121
    insecure_password: str | None = None
8✔
122

123
    def send(self, msg: EmailMessage | None) -> None:
8✔
124
        """Send a message via the SMTP server.
125

126
        :param msg: The message to be sent. Optional in order to allow server login testing.
127
        """
128
        passwd = ''
8✔
129
        if self.auth:
8!
130
            if self.insecure_password:
8✔
131
                passwd = self.insecure_password
8✔
132
            elif isinstance(keyring, ModuleType):
8!
133
                key_pass = keyring.get_password(self.smtp_server, self.smtp_user)
8✔
134
                if key_pass is None:
4!
135
                    raise ValueError(f'No password available in keyring for {self.smtp_server} {self.smtp_user}')
4✔
136
                else:
137
                    passwd = key_pass
×
138
            else:
139
                raise ValueError(f'No password available for {self.smtp_server} {self.smtp_user}')
×
140

141
        with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
8✔
142
            server.ehlo()
8✔
143
            if self.tls:
8!
144
                server.starttls()
8✔
145
            if self.auth:
8!
146
                server.login(self.smtp_user, passwd)
8✔
147
            if msg:
×
148
                server.send_message(msg)
×
149
                logger.info(f'SMTP email sent to {msg.get("to")} via {self.smtp_server}')
×
150

151

152
@dataclass
8✔
153
class SendmailMailer(Mailer):
8✔
154
    """The Mailer class to use sendmail executable."""
155

156
    sendmail_path: str | Path
8✔
157

158
    def send(self, msg: EmailMessage) -> None:
8✔
159
        """Send a message via the sendmail executable.
160

161
        :param msg: The message to be sent.
162
        """
163
        if msg['From']:
×
164
            command = [self.sendmail_path, '-oi', '-f', msg['From']] + [addr.strip() for addr in msg['To'].split(',')]
×
165
        else:
166
            command = [self.sendmail_path, '-oi'] + [addr.strip() for addr in msg['To'].split(',')]
×
167
        p = subprocess.run(  # noqa: S603 subprocess call - check for execution of untrusted input.
×
168
            command,
169
            input=msg.as_string(),
170
            capture_output=True,
171
            text=True,
172
        )
173
        if p.returncode:
×
174
            logger.error(f'Sendmail failed with {p.stderr}')
×
175

176

177
def smtp_have_password(smtp_server: str, from_email: str) -> bool:
8✔
178
    """Check whether the keyring password is set for the email service.
179

180
    :param smtp_server: The address of the SMTP server.
181
    :param from_email: The email address of the sender.
182
    :returns: True if the keyring password is set.
183
    """
184
    if isinstance(keyring, str):
8!
185
        return False
×
186

187
    return keyring.get_password(smtp_server, from_email) is not None
8✔
188

189

190
def smtp_set_password(smtp_server: str, from_email: str) -> None:
8✔
191
    """Set the keyring password for the email service. Interactive.
192

193
    :param smtp_server: The address of the SMTP server.
194
    :param from_email: The email address of the sender.
195
    """
196
    if isinstance(keyring, str):
8!
197
        raise ImportError(f"Python package 'keyring' cannot be loaded - service unsupported\n{keyring}")
×
198

199
    password = getpass.getpass(prompt=f'Enter password for {from_email} using {smtp_server}: ')
8✔
200
    keyring.set_password(smtp_server, from_email, password)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc