• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 27555155862

15 Jun 2026 02:56PM UTC coverage: 69.164%. First build
27555155862

Pull #1427

github

web-flow
Merge 6619d6495 into 82a38ac96
Pull Request #1427: [Release] 4.3.0

718 of 994 new or added lines in 20 files covered. (72.23%)

13628 of 19704 relevant lines covered (69.16%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.64
/pod/video_encode_transcript/utils.py
1
"""Esup-Pod video encoding and transcripting utilities."""
2

3
import logging
1✔
4
import os
1✔
5
import time
1✔
6

7
import bleach
1✔
8
from django.conf import settings
1✔
9
from django.core.mail import EmailMultiAlternatives, mail_admins, mail_managers, send_mail
1✔
10
from django.urls import reverse
1✔
11
from django.utils.translation import gettext_lazy as _
1✔
12

13
from pod.progressive_web_app.utils import notify_user
1✔
14
from pod.video.models import Video
1✔
15

16
from .models import EncodingLog, EncodingStep
1✔
17

18
DEBUG = getattr(settings, "DEBUG", True)
1✔
19
logger = logging.getLogger(__name__)
1✔
20
if DEBUG:
1✔
21
    logger.setLevel(logging.DEBUG)
1✔
22

23
TEMPLATE_VISIBLE_SETTINGS = getattr(
1✔
24
    settings,
25
    "TEMPLATE_VISIBLE_SETTINGS",
26
    {
27
        "TITLE_SITE": "Pod",
28
        "TITLE_ETB": "University name",
29
        "LOGO_SITE": "img/logoPod.svg",
30
        "LOGO_ETB": "img/esup-pod.svg",
31
        "LOGO_PLAYER": "img/pod_favicon.svg",
32
        "LINK_PLAYER": "",
33
        "LINK_PLAYER_NAME": _("Home"),
34
        "FOOTER_TEXT": ("",),
35
        "FAVICON": "img/pod_favicon.svg",
36
        "CSS_OVERRIDE": "",
37
        "PRE_HEADER_TEMPLATE": "",
38
        "POST_FOOTER_TEMPLATE": "",
39
        "TRACKING_TEMPLATE": "",
40
    },
41
)
42

43
__TITLE_SITE__ = (
1✔
44
    TEMPLATE_VISIBLE_SETTINGS["TITLE_SITE"]
45
    if (TEMPLATE_VISIBLE_SETTINGS.get("TITLE_SITE"))
46
    else "Pod"
47
)
48

49
DEFAULT_FROM_EMAIL = getattr(settings, "DEFAULT_FROM_EMAIL", "noreply@univ.fr")
1✔
50
DEFAULT_PLACEHOLDER_EMAIL_HOST = "smtp.univ.fr"
1✔
51
DEFAULT_PLACEHOLDER_ADMINS = (("Name", "adminmail@univ.fr"),)
1✔
52

53
USE_ESTABLISHMENT_FIELD = getattr(settings, "USE_ESTABLISHMENT_FIELD", False)
1✔
54

55
MANAGERS = getattr(settings, "MANAGERS", {})
1✔
56

57
SECURE_SSL_REDIRECT = getattr(settings, "SECURE_SSL_REDIRECT", False)
1✔
58
VIDEOS_DIR = getattr(settings, "VIDEOS_DIR", "videos")
1✔
59
MEDIA_ROOT = getattr(settings, "MEDIA_ROOT", "")
1✔
60
DEFAULT_RECORDER_PATH = getattr(settings, "DEFAULT_RECORDER_PATH", "")
1✔
61

62

63
def _is_safe_file_path(path_file) -> bool:
1✔
64
    """Return whether a path resolves inside an authorized local root."""
65
    if not path_file:
1✔
NEW
66
        return False
×
67
    try:
1✔
68
        candidate = os.path.realpath(os.fspath(path_file))
1✔
NEW
69
    except (TypeError, ValueError, OSError):
×
NEW
70
        return False
×
71

72
    allowed_roots = [MEDIA_ROOT, DEFAULT_RECORDER_PATH, "/tmp"]
1✔
73
    for root in allowed_roots:
1✔
74
        if not root:
1✔
NEW
75
            continue
×
76
        root_real = os.path.realpath(root)
1✔
77
        try:
1✔
78
            if os.path.commonpath([candidate, root_real]) == root_real:
1✔
79
                return True
1✔
NEW
80
        except ValueError:
×
NEW
81
            continue
×
NEW
82
    return False
×
83

84

85
# ##########################################################################
86
# ENCODE VIDEO: GENERIC FUNCTIONS
87
# ##########################################################################
88

89

90
def change_encoding_step(video_id: int, num_step: int, desc: str) -> None:
1✔
91
    """Change encoding step."""
92
    encoding_step, created = EncodingStep.objects.get_or_create(
1✔
93
        video=Video.objects.get(id=video_id)
94
    )
95
    encoding_step.num_step = num_step
1✔
96
    encoding_step.desc_step = desc[:255]
1✔
97
    encoding_step.save()
1✔
98
    logger.debug("Video: %s - step: %d - desc: %s" % (video_id, num_step, desc))
1✔
99

100

101
def add_encoding_log(video_id, log) -> None:
1✔
102
    """Add message in video_id encoding log."""
103
    encoding_log, created = EncodingLog.objects.get_or_create(
1✔
104
        video=Video.objects.get(id=video_id)
105
    )
106
    if created:
1✔
107
        encoding_log.log = log
1✔
108
    else:
109
        encoding_log.log += "\n\n%s" % log
1✔
110
    encoding_log.save()
1✔
111
    logger.debug(log)
1✔
112

113

114
def check_file(path_file) -> bool:
1✔
115
    """Check if path_file is accessible and is not null."""
116
    if not _is_safe_file_path(path_file):
1✔
NEW
117
        return False
×
118
    safe_path = os.path.realpath(os.fspath(path_file))
1✔
119
    try:
1✔
120
        return (
1✔
121
            os.path.isfile(safe_path)
122
            and os.access(safe_path, os.F_OK)
123
            and os.stat(safe_path).st_size > 0
124
        )
NEW
125
    except (OSError, ValueError):
×
NEW
126
        return False
×
127

128

129
def create_outputdir(video_id, video_path):
1✔
130
    """ENCODE VIDEO: CREATE OUTPUT DIR."""
131
    dirname = os.path.dirname(video_path)
×
132
    output_dir = os.path.join(dirname, "%04d" % video_id)
×
133
    if not os.path.exists(output_dir):
×
134
        os.makedirs(output_dir)
×
135
    return output_dir
×
136

137

138
###############################################################
139
# EMAIL
140
###############################################################
141

142

143
def _admin_failure_emails_are_enabled() -> bool:
1✔
144
    """Return True when admin alert emails should be sent.
145

146
    The project ships with placeholder SMTP/admin settings. When those defaults
147
    are still in place, trying to send a failure email only triggers an
148
    unnecessary SMTP connection attempt.
149
    """
150
    admins = tuple(getattr(settings, "ADMINS", ()) or ())
1✔
151
    if not admins:
1✔
152
        return False
1✔
153
    if admins == DEFAULT_PLACEHOLDER_ADMINS:
1✔
154
        return False
1✔
155

156
    email_host = str(getattr(settings, "EMAIL_HOST", "") or "").strip()
1✔
157
    if not email_host:
1✔
158
        return False
1✔
159
    if email_host == DEFAULT_PLACEHOLDER_EMAIL_HOST:
1✔
NEW
160
        return False
×
161

162
    return True
1✔
163

164

165
def send_email_item(msg, item, item_id) -> None:
1✔
166
    """Send email notification when encoding fails for a specific item."""
167
    if not _admin_failure_emails_are_enabled():
1✔
168
        logger.warning(
1✔
169
            "Skipping admin alert email for %s %s because SMTP/admin email settings are not configured.",
170
            item,
171
            item_id,
172
        )
173
        return
1✔
174

175
    subject = "[" + __TITLE_SITE__ + "] Error Encoding %s id: %s" % (item, item_id)
1✔
176
    message = "Error Encoding %s id: %s\n%s" % (item, item_id, msg)
1✔
177
    html_message = "<p>Error Encoding %s id: %s</p><p>%s</p>" % (
1✔
178
        item,
179
        item_id,
180
        msg.replace("\n", "<br>"),
181
    )
182
    mail_admins(subject, message, fail_silently=False, html_message=html_message)
1✔
183

184

185
def send_email_recording(msg, recording_id) -> None:
1✔
186
    """Send email notification when recording encoding failed."""
187
    send_email_item(msg, "Recording", recording_id)
×
188

189

190
def send_email_encoding(video_to_encode) -> None:
1✔
191
    """Send email on encoding completion."""
192
    subject_prefix = _("Encoding")
1✔
193
    send_notification_email(video_to_encode, subject_prefix)
1✔
194

195

196
def send_notification_encoding(video_to_encode) -> None:
1✔
197
    """Send push notification on encoding completion."""
198
    subject_prefix = _("Encoding")
×
199
    send_notification(video_to_encode, subject_prefix)
×
200

201

202
def send_email(msg, video_id) -> None:
1✔
203
    """Send email notification when video encoding failed."""
204
    send_email_item(msg, "Video", video_id)
1✔
205

206

207
def send_email_transcript(video_to_encode) -> None:
1✔
208
    """Send email on transcripting completion."""
209
    subject_prefix = _("The transcripting of content")
×
210
    send_notification_email(video_to_encode, subject_prefix)
×
211

212

213
def send_notification_email(video_to_encode, subject_prefix) -> None:
1✔
214
    """Send email notification on video encoding or transcripting completion."""
215
    logger.debug("SEND EMAIL ON %s COMPLETION" % subject_prefix.upper())
1✔
216
    url_scheme = "https" if SECURE_SSL_REDIRECT else "http"
1✔
217
    content_url = "%s:%s" % (url_scheme, video_to_encode.get_full_url())
1✔
218
    subject = "[%s] %s" % (
1✔
219
        __TITLE_SITE__,
220
        _("%(subject)s #%(content_id)s completed")
221
        % {"subject": subject_prefix, "content_id": video_to_encode.id},
222
    )
223

224
    html_message = (
1✔
225
        '<p>%s</p><p>%s</p><p>%s<br><a href="%s"><i>%s</i></a>\
226
                </p><p>%s</p>'
227
        % (
228
            _("Hello,"),
229
            _(
230
                "%(content_type)s ā€œ%(content_title)sā€ has been %(action)s"
231
                + ", and is now available on %(site_title)s."
232
            )
233
            % {
234
                "content_type": (
235
                    _("The content")
236
                    if subject_prefix == _("The transcripting of content")
237
                    else _("The video")
238
                ),
239
                "content_title": "<b>%s</b>" % video_to_encode.title,
240
                "action": (
241
                    _("automatically transcripted")
242
                    if (subject_prefix == _("The transcripting of content"))
243
                    else _("encoded to Web formats")
244
                ),
245
                "site_title": __TITLE_SITE__,
246
            },
247
            _("You will find it here:"),
248
            content_url,
249
            content_url,
250
            _("Regards."),
251
        )
252
    )
253

254
    full_html_message = html_message + "<br>%s%s<br>%s%s" % (
1✔
255
        _("Post by:"),
256
        video_to_encode.owner,
257
        _("the:"),
258
        video_to_encode.date_added,
259
    )
260

261
    message = bleach.clean(html_message, tags=[], strip=True)
1✔
262
    full_message = bleach.clean(full_html_message, tags=[], strip=True)
1✔
263

264
    from_email = DEFAULT_FROM_EMAIL
1✔
265
    to_email = []
1✔
266
    to_email.append(video_to_encode.owner.email)
1✔
267

268
    if (
1✔
269
        USE_ESTABLISHMENT_FIELD
270
        and MANAGERS
271
        and video_to_encode.owner.owner.establishment.lower() in dict(MANAGERS)
272
    ):
273
        bcc_email = []
×
274
        video_estab = video_to_encode.owner.owner.establishment.lower()
×
275
        manager = dict(MANAGERS)[video_estab]
×
276
        if isinstance(manager, (list, tuple)):
×
277
            bcc_email = manager
×
278
        elif isinstance(manager, str):
×
279
            bcc_email.append(manager)
×
280
        msg = EmailMultiAlternatives(
×
281
            subject, message, from_email, to_email, bcc=bcc_email
282
        )
283
        msg.attach_alternative(html_message, "text/html")
×
284
        msg.send()
×
285
    else:
286
        mail_managers(
1✔
287
            subject,
288
            full_message,
289
            fail_silently=False,
290
            html_message=full_html_message,
291
        )
292
        if not DEBUG:
1✔
293
            send_mail(
×
294
                subject,
295
                message,
296
                from_email,
297
                to_email,
298
                fail_silently=False,
299
                html_message=html_message,
300
            )
301

302

303
def send_notification(video_to_encode, subject_prefix) -> None:
1✔
304
    """Send push notification on video encoding or transcripting completion."""
305
    subject = "[%s] %s" % (
×
306
        __TITLE_SITE__,
307
        _("%(subject)s #%(content_id)s completed")
308
        % {"subject": subject_prefix, "content_id": video_to_encode.id},
309
    )
310
    message = _(
×
311
        "%(content_type)s ā€œ%(content_title)sā€ has been %(action)s"
312
        + ", and is now available on %(site_title)s."
313
    ) % {
314
        "content_type": (
315
            _("content") if subject_prefix == _("Transcripting") else _("video")
316
        ),
317
        "content_title": video_to_encode.title,
318
        "action": (
319
            _("automatically transcripted")
320
            if (subject_prefix == _("Transcripting"))
321
            else _("encoded to Web formats")
322
        ),
323
        "site_title": __TITLE_SITE__,
324
    }
325

326
    notify_user(
×
327
        video_to_encode.owner,
328
        subject,
329
        message,
330
        url=reverse("video:video", args=(video_to_encode.slug,)),
331
    )
332

333

334
def time_to_seconds(a_time) -> int:
1✔
335
    """Convert a time to seconds."""
336
    seconds = time.strptime(str(a_time), "%H:%M:%S")
1✔
337
    seconds = seconds.tm_sec + seconds.tm_min * 60 + seconds.tm_hour * 3600
1✔
338
    return seconds
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc