• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zestedesavoir / zds-site / 8625617051

31 Mar 2024 04:43PM UTC coverage: 88.752% (+0.05%) from 88.699%
8625617051

push

github

web-flow
Ajoute la modification du titre et du sous-titre d'une publication depuis une modale (#6590)

4740 of 5934 branches covered (79.88%)

126 of 126 new or added lines in 5 files covered. (100.0%)

3 existing lines in 2 files now uncovered.

16515 of 18608 relevant lines covered (88.75%)

1.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.29
/zds/tutorialv2/publication_utils.py
1
import contextlib
3✔
2
import copy
3✔
3
import logging
3✔
4
import os
3✔
5
import shutil
3✔
6
import subprocess
3✔
7
import zipfile
3✔
8
from datetime import datetime
3✔
9
from os import makedirs, path
3✔
10
from pathlib import Path
3✔
11

12
import requests
3✔
13
from django.core.exceptions import ObjectDoesNotExist
3✔
14
from django.template.defaultfilters import date
3✔
15
from django.template.loader import render_to_string
3✔
16
from django.utils import translation
3✔
17
from django.utils.translation import gettext_lazy as _
3✔
18
from django.conf import settings
3✔
19
from zds.tutorialv2 import signals
3✔
20
from zds.tutorialv2.epub_utils import build_ebook
3✔
21
from zds.tutorialv2.models.database import ContentReaction, PublishedContent, PublicationEvent
3✔
22
from zds.tutorialv2.publish_container import publish_use_manifest
3✔
23
from zds.tutorialv2.signals import content_unpublished
3✔
24
from zds.tutorialv2.utils import export_content
3✔
25
from zds.forum.utils import send_post, lock_topic
3✔
26
from zds.utils.templatetags.emarkdown import render_markdown
3✔
27
from zds.utils.templatetags.smileys_def import SMILEYS_BASE_PATH, LICENSES_BASE_PATH
3✔
28

29
logger = logging.getLogger(__name__)
3✔
30
licences = {
3✔
31
    "by-nc-nd": "by-nc-nd.svg",
32
    "by-nc-sa": "by-nc-sa.svg",
33
    "by-nc": "by-nc.svg",
34
    "by-nd": "by-nd.svg",
35
    "by-sa": "by-sa.svg",
36
    "by": "by.svg",
37
    "0": "0.svg",
38
    "copyright": "copyright.svg",
39
}
40

41

42
def notify_update(db_object, is_update, is_major):
3✔
43
    if not is_update or is_major:
2✔
44
        # Follow
45
        signals.content_published.send(sender=db_object.__class__, instance=db_object, by_email=False)
2✔
46

47

48
def publish_content(db_object, versioned, is_major_update=True):
3✔
49
    """
50
    Publish a given content.
51

52
    .. note::
53
        create a manifest.json without the introduction and conclusion if not needed. Also remove the 'text' field
54
        of extracts.
55

56
    :param db_object: Database representation of the content
57
    :type db_object: zds.tutorialv2.models.database.PublishableContent
58
    :param versioned: version of the content to publish
59
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
60
    :param is_major_update: if set to `True`, will update the publication date
61
    :type is_major_update: bool
62
    :raise FailureDuringPublication: if something goes wrong
63
    :return: the published representation
64
    :rtype: zds.tutorialv2.models.database.PublishedContent
65
    """
66

67
    from zds.tutorialv2.models.database import PublishedContent
3✔
68

69
    if is_major_update:
3✔
70
        versioned.pubdate = datetime.now()
3✔
71

72
    # First write the files to a temporary directory: if anything goes wrong,
73
    # the last published version is not impacted !
74
    tmp_path = path.join(settings.ZDS_APP["content"]["repo_public_path"], versioned.slug + "__building")
3✔
75
    if path.exists(tmp_path):
3✔
76
        shutil.rmtree(tmp_path)  # remove previous attempt, if any
3✔
77

78
    # render HTML:
79
    altered_version = copy.deepcopy(versioned)
3✔
80
    char_count = publish_use_manifest(db_object, tmp_path, altered_version)
3✔
81
    altered_version.dump_json(path.join(tmp_path, "manifest.json"))
3✔
82

83
    # make room for 'extra contents'
84
    build_extra_contents_path = path.join(tmp_path, settings.ZDS_APP["content"]["extra_contents_dirname"])
3✔
85
    makedirs(build_extra_contents_path)
3✔
86
    base_name = path.join(build_extra_contents_path, versioned.slug)
3✔
87

88
    # 1. markdown file (base for the others) :
89
    # If we come from a command line, we need to activate i18n, to have the date in the french language.
90
    cur_language = translation.get_language()
3✔
91
    altered_version.pubdate = datetime.now()
3✔
92

93
    md_file_path = base_name + ".md"
3✔
94
    with contextlib.suppress(OSError):
3✔
95
        Path(Path(md_file_path).parent, "images").mkdir()
3✔
96
    is_update = False
3✔
97

98
    if db_object.public_version:
3✔
99
        public_version = update_existing_publication(db_object, versioned)
3✔
100
        is_update = True
3✔
101
    else:
102
        public_version = PublishedContent()
3✔
103

104
    # make the new public version
105
    public_version.content_public_slug = versioned.slug
3✔
106
    public_version.content_type = versioned.type
3✔
107
    public_version.content_pk = db_object.pk
3✔
108
    public_version.content = db_object
3✔
109
    public_version.must_reindex = True
3✔
110
    public_version.char_count = char_count
3✔
111
    public_version.save()
3✔
112
    with contextlib.suppress(FileExistsError):
3✔
113
        makedirs(public_version.get_extra_contents_directory())
3✔
114
    if is_major_update or not is_update:
3✔
115
        public_version.publication_date = datetime.now()
3✔
116
    elif is_update:
2!
117
        public_version.update_date = datetime.now()
2✔
118
    public_version.sha_public = versioned.current_version
3✔
119
    public_version.save(update_fields=["publication_date", "update_date", "sha_public"])
3✔
120

121
    public_version.authors.clear()
3✔
122
    for author in db_object.authors.all():
3✔
123
        public_version.authors.add(author)
3✔
124

125
    # this puts the manifest.json and base json file on the prod path.
126
    shutil.rmtree(public_version.get_prod_path(), ignore_errors=True)
3✔
127
    shutil.copytree(tmp_path, public_version.get_prod_path())
3✔
128
    db_object.sha_public = versioned.current_version
3✔
129
    public_version.save()
3✔
130
    if settings.ZDS_APP["content"]["extra_content_generation_policy"] == "SYNC":
3✔
131
        # ok, now we can really publish the thing!
132
        generate_external_content(base_name, build_extra_contents_path, md_file_path, versioned=versioned)
3✔
133
    elif settings.ZDS_APP["content"]["extra_content_generation_policy"] == "WATCHDOG":
2!
134
        PublicatorRegistry.get("watchdog").publish(md_file_path, base_name, silently_pass=False)
2✔
135

136
    return public_version
3✔
137

138

139
def update_existing_publication(db_object, versioned):
3✔
140
    public_version = db_object.public_version
3✔
141
    # the content has been published in the past, so we will clean up old files!
142
    old_path = public_version.get_prod_path()
3✔
143

144
    # if the slug has changed, create a new object instead of reusing the old one
145
    # this allows us to handle permanent redirection so that SEO is not impacted.
146
    if versioned.slug != public_version.content_public_slug:
3✔
147
        public_version.must_redirect = True  # set redirection
2✔
148
        public_version.save(update_fields=["must_redirect"])
2✔
149
        publication_date = public_version.publication_date
2✔
150
        db_object.public_version = PublishedContent()
2✔
151
        public_version = db_object.public_version
2✔
152

153
        # keep the same publication date if the content is already published
154
        public_version.publication_date = publication_date
2✔
155

156
    # remove old files only if everything succeed so far: if something bad
157
    # happened, we don't want to have a published content without content!
158
    logging.getLogger(__name__).debug("erase " + old_path)
3✔
159
    shutil.rmtree(old_path)
3✔
160

161
    return public_version
3✔
162

163

164
def write_md_file(md_file_path, parsed_with_local_images, versioned):
3✔
165
    with open(md_file_path, "w", encoding="utf-8") as md_file:
3✔
166
        try:
3✔
167
            md_file.write(parsed_with_local_images)
3✔
168
        except UnicodeError:
×
169
            logger.error("Could not encode %s in UTF-8, publication aborted", versioned.title)
×
170
            raise FailureDuringPublication(
×
171
                _(
172
                    "Une erreur est survenue durant la génération du fichier markdown "
173
                    "à télécharger, vérifiez le code markdown"
174
                )
175
            )
176

177

178
def generate_external_content(
3✔
179
    base_name, extra_contents_path, md_file_path, overload_settings=False, excluded=None, **kwargs
180
):
181
    """
182
    generate all static file that allow offline access to content
183

184
    :param base_name: base nae of file (without extension)
185
    :param extra_contents_path: internal directory where all files will be pushed
186
    :param md_file_path: bundled markdown file path
187
    :param overload_settings: this option force the function to generate all registered formats even when settings \
188
    ask for PDF not to be published
189
    :param excluded: list of excluded format, None if no exclusion
190
    """
191
    excluded = excluded or ["watchdog"]
3✔
192
    if not settings.ZDS_APP["content"]["build_pdf_when_published"] and not overload_settings:
3✔
193
        excluded.append("pdf")
3✔
194
    for publicator_name, publicator in PublicatorRegistry.get_all_registered(excluded):
3✔
195
        try:
3✔
196
            publicator.publish(
3✔
197
                md_file_path,
198
                base_name,
199
                change_dir=extra_contents_path,
200
                cur_language=translation.get_language(),
201
                **kwargs,
202
            )
203
        except (FailureDuringPublication, OSError):
1✔
204
            logging.getLogger(__name__).exception(
1✔
205
                "Could not publish %s format from %s base.", publicator_name, md_file_path
206
            )
207

208

209
class PublicatorRegistry:
3✔
210
    """
211
    Register all publicator as a 'human-readable name/publicator' instance key/value list
212
    """
213

214
    registry = {}
3✔
215

216
    @classmethod
3✔
217
    def register(cls, publicator_name, *args):
3✔
218
        def decorated(func):
3✔
219
            cls.registry[publicator_name] = func(*args)
3✔
220
            return func
3✔
221

222
        return decorated
3✔
223

224
    @classmethod
3✔
225
    def get_all_registered(cls, exclude=None):
3✔
226
        """
227
        Args:
228
            exclude: A list of excluded publicator
229

230
        Returns:
231
        """
232
        if exclude is None:
3✔
233
            exclude = []
1✔
234
        order_key = {
3✔
235
            "zip": 1,
236
            "md": 2,
237
            "html": 3,
238
            "epub": 4,
239
            "pdf": 5,
240
        }
241
        for key, value in sorted(cls.registry.items(), key=lambda k: order_key.get(k[0], 42)):
3✔
242
            if key not in exclude:
3✔
243
                yield key, value
3✔
244

245
    @classmethod
3✔
246
    def unregister(cls, name):
3✔
247
        """
248
        Remove registered Publicator named 'name' if present
249

250
        :param name: publicator name.
251
        """
252
        if name in cls.registry:
×
253
            del cls.registry[name]
×
254

255
    @classmethod
3✔
256
    def get(cls, name):
3✔
257
        """
258
        Get publicator named 'name'.
259

260
        :param name:
261
        :return: the wanted publicator
262
        :rtype: Publicator
263
        :raise KeyError: if publicator is not registered
264
        """
265
        return cls.registry[name]
2✔
266

267

268
class Publicator:
3✔
269
    """
270
    Publicator base object, all methods must be overridden
271
    """
272

273
    def publish(self, md_file_path, base_name, **kwargs):
3✔
274
        """
275
        Function called to generate a content export
276

277
        :param md_file_path: base markdown file path
278
        :param base_name: file name without extension
279
        :param kwargs: other publicator dependent options
280
        """
281
        raise NotImplementedError()
×
282

283
    def get_published_content_entity(self, md_file_path) -> PublishedContent:
3✔
284
        """
285
        Retrieve the db entity from mdfile path
286

287
        :param md_file_path: mdfile path as string
288
        :type md_file_path: str
289
        :return: the db entity
290
        :rtype: zds.tutorialv2.models.models_database.PublishedContent
291
        """
292
        content_slug = PublishedContent.get_slug_from_file_path(md_file_path)
3✔
293
        published_content_entity = PublishedContent.objects.filter(content_public_slug=content_slug).first()
3✔
294
        return published_content_entity
3✔
295

296

297
@PublicatorRegistry.register("md")
3✔
298
class MarkdownPublicator(Publicator):
3✔
299
    def publish(self, md_file_path, base_name, *, cur_language=settings.LANGUAGE_CODE, **kwargs):
3✔
300
        published_content_entity = self.get_published_content_entity(md_file_path)
3✔
301
        versioned = kwargs.pop("versioned", None)
3✔
302
        if not versioned:
3✔
303
            # do not use load_public_version as it lacks of information to get the content
304
            # if you use it you will only get titles, without the text
305
            versioned = published_content_entity.content.load_version(sha=published_content_entity.sha_public)
1✔
306
        try:
3✔
307
            translation.activate(settings.LANGUAGE_CODE)
3✔
308
            parsed = render_to_string("tutorialv2/export/content.md", {"content": versioned})
3✔
309
        except requests.exceptions.HTTPError:
×
310
            raise FailureDuringPublication("Could not publish flat markdown")
×
311
        finally:
312
            translation.activate(cur_language)
3!
313

314
        write_md_file(md_file_path, parsed, versioned)
3✔
315
        if "__building" in md_file_path:
3✔
316
            shutil.copy2(md_file_path, md_file_path.replace("__building", ""))
3✔
317

318

319
def _read_flat_markdown(md_file_path):
3✔
320
    with open(md_file_path, encoding="utf-8") as md_file_handler:
×
321
        md_flat_content = md_file_handler.read()
×
322
    return md_flat_content
×
323

324

325
@PublicatorRegistry.register("zip")
3✔
326
class ZipPublicator(Publicator):
3✔
327
    def publish(self, md_file_path, base_name, **kwargs):
3✔
328
        try:
3✔
329
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
330
            if published_content_entity is None:
3!
331
                raise ValueError("published_content_entity is None")
×
332
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_zip"]:
3!
333
                logger.info("ZIP not allowed for opinions.")
×
334
                return
×
335
            make_zip_file(published_content_entity)
3✔
336
            # no need to move zip file because it is already dumped to the public directory
337
        except (OSError, ValueError) as e:
×
338
            raise FailureDuringPublication("Zip could not be created", e)
×
339

340

341
@PublicatorRegistry.register("pdf")
3✔
342
class ZMarkdownRebberLatexPublicator(Publicator):
3✔
343
    """
344
    Use zmarkdown and rebber stringifier to produce latex & pdf output.
345
    """
346

347
    def __init__(self, extension=".pdf", latex_classes=""):
3✔
348
        self.extension = extension
3✔
349
        self.doc_type = extension[1:]
3✔
350
        self.latex_classes = latex_classes
3✔
351

352
    def publish(self, md_file_path, base_name, **kwargs):
3✔
353
        published_content_entity = self.get_published_content_entity(md_file_path)
1✔
354
        if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_pdf"]:
1!
355
            logger.info("PDF not allowed for opinions")
×
356
            return
×
357
        gallery_pk = published_content_entity.content.gallery.pk
1✔
358
        depth_to_size_map = {
1✔
359
            1: "small",  # in fact this is an "empty" tutorial (i.e it is empty or has intro and/or conclusion)
360
            2: "small",
361
            3: "middle",
362
            4: "big",
363
        }
364
        public_versionned_source = published_content_entity.content.load_version(
1✔
365
            sha=published_content_entity.sha_public
366
        )
367
        base_directory = Path(base_name).parent
1✔
368
        image_dir = base_directory / "images"
1✔
369
        with contextlib.suppress(FileExistsError):
1✔
370
            image_dir.mkdir(parents=True)
1✔
371
        if (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).exists():
1!
372
            for image in (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).iterdir():
1!
UNCOV
373
                with contextlib.suppress(OSError):
×
UNCOV
374
                    shutil.copy2(str(image.absolute()), str(image_dir))
×
375
        content_type = depth_to_size_map[public_versionned_source.get_tree_level()]
1✔
376
        if self.latex_classes:
1!
377
            content_type += ", " + self.latex_classes
×
378
        title = published_content_entity.title()
1✔
379
        authors = [a.username for a in published_content_entity.authors.all()]
1✔
380

381
        licence = published_content_entity.content.licence.code
1✔
382
        licence_short = licence.replace("CC", "").strip().lower()
1✔
383
        licence_logo = licences.get(licence_short, False)
1✔
384
        if licence_logo:
1!
385
            licence_url = f"https://creativecommons.org/licenses/{licence_short}/4.0/legalcode"
×
386
            # we need a specific case for CC-0 as it is "public-domain"
387
            if licence_logo == licences["0"]:
×
388
                licence_url = "https://creativecommons.org/publicdomain/zero/1.0/legalcode.fr"
×
389
        else:
390
            licence = str(_("Tous droits réservés"))
1✔
391
            licence_logo = licences["copyright"]
1✔
392
            licence_url = ""
1✔
393

394
        replacement_image_url = str(settings.MEDIA_ROOT.parent)
1✔
395
        if not replacement_image_url.endswith("/"):
1!
396
            replacement_image_url += "/"
1✔
397
        replaced_media_url = settings.MEDIA_URL
1✔
398
        if replaced_media_url.startswith("/"):
1!
399
            replaced_media_url = replaced_media_url[1:]
1✔
400
        exported = export_content(public_versionned_source, with_text=True, ready_to_publish_only=True)
1✔
401
        # no title to avoid zmd to put it on the final latex
402
        del exported["title"]
1✔
403
        content, metadata, messages = render_markdown(
1✔
404
            exported,
405
            output_format="texfile",
406
            # latex template arguments
407
            content_type=content_type,
408
            title=title,
409
            authors=authors,
410
            license=licence,
411
            license_directory=str(LICENSES_BASE_PATH),
412
            license_logo=licence_logo,
413
            license_url=licence_url,
414
            smileys_directory=str(SMILEYS_BASE_PATH / "svg"),
415
            images_download_dir=str(base_directory / "images"),
416
            local_url_to_local_path=["/", replacement_image_url],
417
            heading_shift=-1,
418
            date=date(published_content_entity.last_publication_date, "l d F Y"),
419
        )
420
        if content == "" and messages:
1!
421
            raise FailureDuringPublication(f"Markdown was not parsed due to {messages}")
×
422
        zmd_class_dir_path = Path(settings.ZDS_APP["content"]["latex_template_repo"])
1✔
423
        content.replace(replacement_image_url + replaced_media_url, replacement_image_url)
1✔
424
        if zmd_class_dir_path.exists() and zmd_class_dir_path.is_dir():
1!
425
            with contextlib.suppress(FileExistsError):
×
426
                zmd_class_link = base_directory / "zmdocument.cls"
×
427
                zmd_class_link.symlink_to(zmd_class_dir_path / "zmdocument.cls")
×
428
                luatex_dir_link = base_directory / "utf8.lua"
×
429
                luatex_dir_link.symlink_to(zmd_class_dir_path / "utf8.lua", target_is_directory=True)
×
430
        true_latex_extension = ".".join(self.extension.split(".")[:-1]) + ".tex"
1✔
431
        latex_file_path = base_name + true_latex_extension
1✔
432
        pdf_file_path = base_name + self.extension
1✔
433
        default_logo_original_path = Path(__file__).parent / ".." / ".." / "assets" / "images" / "logo@2x.png"
1✔
434
        with contextlib.suppress(FileExistsError):
1✔
435
            shutil.copy(str(default_logo_original_path), str(base_directory / "default_logo.png"))
1✔
436
        with open(latex_file_path, mode="w", encoding="utf-8") as latex_file:
1✔
437
            latex_file.write(content)
1✔
438
        shutil.copy2(latex_file_path, published_content_entity.get_extra_contents_directory())
1✔
439

440
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
1✔
441
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
×
442
        self.make_glossary(base_name.split("/")[-1], latex_file_path)
×
443
        self.full_tex_compiler_call(latex_file_path)
×
444

445
        shutil.copy2(pdf_file_path, published_content_entity.get_extra_contents_directory())
×
446

447
    def full_tex_compiler_call(self, latex_file, draftmode: str = ""):
3✔
448
        success_flag = self.tex_compiler(latex_file, draftmode)
1✔
449
        if not success_flag:
1!
450
            handle_tex_compiler_error(latex_file, self.extension)
1✔
451

452
    def handle_makeglossaries_error(self, latex_file):
3✔
453
        with open(path.splitext(latex_file)[0] + ".log") as latex_log:
×
454
            errors = "\n".join(filter(line for line in latex_log if "fatal" in line.lower() or "error" in line.lower()))
×
455
        raise FailureDuringPublication(errors)
×
456

457
    def tex_compiler(self, texfile, draftmode: str = ""):
3✔
458
        command = f"lualatex -shell-escape -interaction=nonstopmode {draftmode} {texfile}"
1✔
459
        command_process = subprocess.Popen(
1✔
460
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
461
        )
462
        # let's put 10 min of timeout because we do not generate latex everyday
463
        command_process.communicate(timeout=600)
1✔
464
        with contextlib.suppress(ImportError):
1✔
465
            import sentry_sdk
1✔
466

467
            sentry_sdk.add_breadcrumb(message="lualatex call", data=command, type="cmd")
1✔
468

469
        pdf_file_path = path.splitext(texfile)[0] + self.extension
1✔
470
        return path.exists(pdf_file_path)
1✔
471

472
    def make_glossary(self, basename, texfile):
3✔
473
        command = f"makeglossaries {basename}"
×
474
        command_process = subprocess.Popen(
×
475
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
476
        )
477
        std_out, std_err = command_process.communicate()
×
478
        with contextlib.suppress(ImportError):
×
479
            import sentry_sdk
×
480

481
            sentry_sdk.add_breadcrumb(message="makeglossaries call", data=command, type="cmd")
×
482
        # TODO: check makeglossary exit codes to see if we can enhance error detection
483
        if "fatal" not in std_out.decode("utf-8").lower() and "fatal" not in std_err.decode("utf-8").lower():
×
484
            return True
×
485

486
        self.handle_makeglossaries_error(texfile)
×
487

488

489
def handle_tex_compiler_error(latex_file_path, ext):
3✔
490
    # TODO zmd: fix extension parsing
491
    log_file_path = latex_file_path[:-3] + "log"
1✔
492
    errors = [f"Error occured, log file {log_file_path} not found."]
1✔
493
    with contextlib.suppress(FileNotFoundError, UnicodeDecodeError):
1✔
494
        with Path(log_file_path).open(encoding="utf-8") as latex_log:
1✔
495
            print_context = 25
×
496
            lines = []
×
497
            relevant_line = -print_context
×
498
            for idx, line in enumerate(latex_log):
×
499
                if "fatal" in line.lower() or "error" in line.lower():
×
500
                    relevant_line = idx
×
501
                    lines.append(line)
×
502
                elif idx - relevant_line < print_context:
×
503
                    lines.append(line)
×
504

505
            errors = "\n".join(lines)
1✔
506
    logger.debug("%s ext=%s", errors, ext)
1✔
507
    with contextlib.suppress(ImportError):
1✔
508
        import sentry_sdk
1✔
509

510
        sentry_sdk.add_breadcrumb(message="luatex call", data=errors, type="cmd")
1✔
511

512
    raise FailureDuringPublication(errors)
1✔
513

514

515
@PublicatorRegistry.register("epub")
3✔
516
class ZMarkdownEpubPublicator(Publicator):
3✔
517
    def publish(self, md_file_path, base_name, **kwargs):
3✔
518
        try:
3✔
519
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
520
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_epub"]:
3!
521
                logger.info("EPUB not allowed for opinions")
×
522
                return
×
523
            epub_file_path = Path(base_name + ".epub")
3✔
524
            logger.info("Start generating epub")
3✔
525
            build_ebook(published_content_entity, path.dirname(md_file_path), epub_file_path)
3✔
526
        except (OSError, requests.exceptions.HTTPError):
×
527
            raise FailureDuringPublication("Error while generating epub file.")
×
528
        else:
529
            logger.info(epub_file_path)
3✔
530
            epub_path = Path(published_content_entity.get_extra_contents_directory(), Path(epub_file_path.name))
3✔
531
            if epub_path.exists():
3!
532
                os.remove(str(epub_path))
×
533
            if not epub_path.parent.exists():
3!
534
                epub_path.parent.mkdir(parents=True)
×
535
            logger.info(
3✔
536
                "created %s. moving it to %s", epub_file_path, published_content_entity.get_extra_contents_directory()
537
            )
538
            shutil.move(str(epub_file_path), published_content_entity.get_extra_contents_directory())
3✔
539

540

541
@PublicatorRegistry.register("watchdog")
3✔
542
class WatchdogFilePublicator(Publicator):
3✔
543
    def publish(self, md_file_path, base_name, silently_pass=True, **kwargs):
3✔
544
        if silently_pass:
2!
545
            return
×
546
        published_content = self.get_published_content_entity(md_file_path)
2✔
547
        self.publish_from_published_content(published_content)
2✔
548

549
    def publish_from_published_content(self, published_content: PublishedContent):
3✔
550
        for requested_format in PublicatorRegistry.get_all_registered(["watchdog"]):
2✔
551
            # Remove previous PublicationEvent for this content, not handled by
552
            # the publication watchdog yet:
553
            PublicationEvent.objects.filter(
2✔
554
                state_of_processing="REQUESTED",
555
                published_object__content_pk=published_content.content_pk,
556
                format_requested=requested_format[0],
557
            ).delete()
558
            PublicationEvent.objects.create(
2✔
559
                state_of_processing="REQUESTED",
560
                published_object=published_content,
561
                format_requested=requested_format[0],
562
            )
563

564

565
class FailureDuringPublication(Exception):
3✔
566
    """Exception raised if something goes wrong during publication process"""
567

568
    def __init__(self, *args, **kwargs):
3✔
569
        super().__init__(*args, **kwargs)
1✔
570

571

572
def make_zip_file(published_content):
3✔
573
    """Create the zip archive extra content from the published content
574

575
    :param published_content: a PublishedContent object
576
    :return:
577
    """
578

579
    publishable = published_content.content
3✔
580
    # update SHA so that archive gets updated too
581
    publishable.sha_public = publishable.sha_draft
3✔
582
    file_path = path.join(
3✔
583
        published_content.get_extra_contents_directory(), published_content.content_public_slug + ".zip"
584
    )
585
    zip_file = zipfile.ZipFile(file_path, "w")
3✔
586
    versioned = publishable.load_version(None, True)
3✔
587
    from zds.tutorialv2.views.archives import DownloadContent
3✔
588

589
    DownloadContent.insert_into_zip(zip_file, versioned.repository.commit(versioned.current_version).tree)
3✔
590
    zip_file.close()
3✔
591
    return file_path
3✔
592

593

594
def unpublish_content(db_object, moderator=None):
3✔
595
    """
596
    Remove the given content from the public view.
597

598
    .. note::
599
        This will send content_unpublished event.
600

601
    :param db_object: Database representation of the content
602
    :type db_object: PublishableContent
603
    :param moderator: the staff user who triggered the unpublish action.
604
    :type moderator: django.contrib.auth.models.User
605
    :return: ``True`` if unpublished, ``False`` otherwise
606
    :rtype: bool
607
    """
608

609
    from zds.tutorialv2.models.database import PublishedContent
2✔
610

611
    with contextlib.suppress(ObjectDoesNotExist, OSError):
2✔
612
        public_version = PublishedContent.objects.get(pk=db_object.public_version.pk)
2✔
613

614
        results = [
2✔
615
            content_unpublished.send(
616
                sender=reaction.__class__, instance=db_object, target=ContentReaction, moderator=moderator, user=None
617
            )
618
            for reaction in [ContentReaction.objects.filter(related_content=db_object).all()]
619
        ]
620
        logging.debug("Nb_messages=%d, messages=%s", len(results), results)
2✔
621
        # remove public_version:
622
        public_version.delete()
2✔
623
        update_params = {"public_version": None}
2✔
624

625
        if db_object.is_opinion:
2✔
626
            update_params["sha_public"] = None
1✔
627
            update_params["sha_picked"] = None
1✔
628
            update_params["pubdate"] = None
1✔
629

630
        db_object.update(**update_params)
2✔
631
        content_unpublished.send(
2✔
632
            sender=db_object.__class__, instance=db_object, target=db_object.__class__, moderator=moderator
633
        )
634
        # clean files
635
        old_path = public_version.get_prod_path()
2✔
636
        public_version.content.update(public_version=None, sha_public=None)
2✔
637
        if path.exists(old_path):
2!
638
            shutil.rmtree(old_path)
2✔
639
        return True
2✔
640

641
    return False
×
642

643

644
def close_article_beta(db_object, versioned, user, request=None):
3✔
645
    """
646
    Close forum topic of an article if the artcle was in beta.
647
    :param db_object: the article
648
    :type db_object: zds.tutorialv2.models.database.PublishableContent
649
    :param versioned: the public version of article, used to pupulate closing post
650
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
651
    :param user: the current user
652
    :param request: the current request
653
    """
654
    if db_object.type == "ARTICLE":
1✔
655
        db_object.sha_beta = None
1✔
656
        topic = db_object.beta_topic
1✔
657
        if topic is not None and not topic.is_locked:
1✔
658
            msg_post = render_to_string("tutorialv2/messages/beta_desactivate.md", {"content": versioned})
1✔
659
            send_post(request, topic, user, msg_post)
1✔
660
            lock_topic(topic)
1✔
661

662

663
def save_validation_state(
3✔
664
    db_object,
665
    is_update,
666
    published: PublishedContent,
667
    validation,
668
    versioned,
669
    source="",
670
    is_major=False,
671
    user=None,
672
    request=None,
673
    comment="",
674
):
675
    """
676
    Save validation after publication, changes its status to ACCEPT
677
    :param db_object:  the content
678
    :type db_object: zds.tutorialv2.models.database.PublishableContent
679
    :param is_update: marks if the publication is an update or a new/major publication
680
    :param published: the PublishedContent instance
681
    :param validation: the related validation
682
    :param versioned:  the VersionedContent related to the public sha
683
    :param source: the optional cannonical link
684
    :param is_major: marks a major publication (first one, or new parts for example)
685
    :param user: validating user
686
    :param request: current request to get hats, and send error messages if needed
687
    """
688
    db_object.sha_public = validation.version
1✔
689
    db_object.source = source
1✔
690
    db_object.sha_validation = None
1✔
691
    db_object.public_version = published
1✔
692
    if is_major or not is_update or db_object.pubdate is None:
1✔
693
        db_object.pubdate = datetime.now()
1✔
694
        db_object.is_obsolete = False
1✔
695

696
    # close beta if is an article
697
    close_article_beta(db_object, versioned, user=user, request=request)
1✔
698
    db_object.save()
1✔
699
    # save validation object
700
    validation.comment_validator = comment
1✔
701
    validation.status = "ACCEPT"
1✔
702
    validation.date_validation = datetime.now()
1✔
703
    validation.save()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc