• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

collective / collective.documentgenerator / 20063417860

09 Dec 2025 12:26PM UTC coverage: 90.375% (-1.1%) from 91.463%
20063417860

push

github

web-flow
Merge pull request #70 from collective/MOD-1044/convert-odt-to-pdf

Mod 1044/convert odt to pdf

35 of 69 new or added lines in 2 files covered. (50.72%)

2169 of 2400 relevant lines covered (90.38%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.49
/src/collective/documentgenerator/utils.py
1
# -*- coding: utf-8 -*-
2
from appy.bin.odfclean import Cleaner
1✔
3
from appy.pod.lo_pool import LoPool
1✔
4
from appy.pod.renderer import Renderer
1✔
5
from collective.documentgenerator import _
1✔
6
from collective.documentgenerator import config
1✔
7
from imio.helpers.content import uuidToObject
1✔
8
from imio.helpers.security import fplog
1✔
9
from plone import api
1✔
10
from plone.dexterity.utils import createContentInContainer
1✔
11
from plone.namedfile.file import NamedBlobFile
1✔
12
from Products.CMFCore.utils import getToolByName
1✔
13
from Products.CMFPlone.utils import safe_unicode
1✔
14
from zope import i18n
1✔
15
from zope.annotation import IAnnotations
1✔
16
from zope.component import getMultiAdapter
1✔
17
from zope.component.hooks import getSite
1✔
18
from zope.component.hooks import setSite
1✔
19
from zope.interface import Interface
1✔
20
from zope.interface import Invalid
1✔
21
from zope.lifecycleevent import Attributes
1✔
22
from zope.lifecycleevent import modified
1✔
23

24
import hashlib
1✔
25
import logging
1✔
26
import os
1✔
27
import re
1✔
28
import tempfile
1✔
29

30

31
logger = logging.getLogger('collective.documentgenerator')
1✔
32

33

34
def translate(msgid, domain='collective.documentgenerator'):
1✔
35
    portal = api.portal.get()
1✔
36
    translation = i18n.translate(
1✔
37
        msgid,
38
        domain=domain,
39
        context=portal.REQUEST
40
    )
41
    return translation
1✔
42

43

44
def compute_md5(data):
1✔
45
    md5 = hashlib.md5(data).hexdigest()
1✔
46
    return md5
1✔
47

48

49
def update_templates(templates, profile='', force=False):
1✔
50
    """
51
        function to manage templates update.
52
        # see http://trac.imio.be/trac/ticket/9383 for full implementation
53
        :param list templates: list of tuples containing ('plone-template-path', 'os-template-path')
54
        :param str profile: profile path stored on template (or various identification)
55
        :param bool force: force overrides of templates
56
    """
57
    # Don't use profile now !
58
    portal = api.portal.getSite()
1✔
59
    ret = []
1✔
60
    for (ppath, ospath) in templates:
1✔
61
        ppath = ppath.strip('/ ')
1✔
62
        obj = portal.unrestrictedTraverse(ppath, default=None)
1✔
63
        if not obj:
1✔
64
            logger.warn("The plone template '%s' was not found" % ppath)
1✔
65
            ret.append((ppath, ospath, 'plone path error'))
1✔
66
            continue
1✔
67
        if not obj.odt_file:
1✔
68
            logger.warn("The plone template '%s' doesn't have odt file" % ppath)
×
69
            ret.append((ppath, ospath, 'no odt file'))
×
70
            continue
×
71
        if not os.path.exists(ospath):
1✔
72
            logger.warn("The template file '%s' doesn't exist" % ospath)
1✔
73
            ret.append((ppath, ospath, 'os path error'))
1✔
74
            continue
1✔
75
        with open(ospath, 'rb') as f:
1✔
76
            data = f.read()
1✔
77
            new_md5 = compute_md5(data)
1✔
78
            if obj.initial_md5 == new_md5:
1✔
79
                ret.append((ppath, ospath, 'unchanged'))
1✔
80
                continue
1✔
81
            elif obj.has_been_modified() and not force:
1✔
82
                ret.append((ppath, ospath, 'kept'))
1✔
83
                continue
1✔
84
            obj.initial_md5 = new_md5
1✔
85
            obj.style_modification_md5 = new_md5
1✔
86
            obj.odt_file.data = data
1✔
87
            modified(obj, Attributes(Interface, 'odt_file'))
1✔
88
            ret.append((ppath, ospath, 'replaced'))
1✔
89
    return ret
1✔
90

91

92
def update_dict_with_validation(original_dict, update_dict, error_message=_("Dict update collision on key")):
1✔
93
    for key in update_dict:
1✔
94
        if key in original_dict:
1✔
95
            raise Invalid(_("${error_message} for key = '${key}'",
1✔
96
                            mapping={'error_message': error_message, 'key': key}))
97

98
        original_dict[key] = update_dict[key]
1✔
99

100

101
def ulocalized_time(date, long_format=None, time_only=None, custom_format=None,
1✔
102
                    domain='plonelocales', target_language=None, context=None,
103
                    request=None, month_lc=True, day_lc=True):
104
    """
105
        Return for a datetime the string value with week and mont translated.
106
        Take into account %a, %A, %b, %B
107
    """
108
    if not custom_format:
1✔
109
        # use toLocalizedTime
110
        plone = getMultiAdapter((context, request), name=u'plone')
1✔
111
        formatted_date = plone.toLocalizedTime(date, long_format, time_only)
1✔
112
    else:
113
        from Products.CMFPlone.i18nl10n import monthname_msgid
1✔
114
        from Products.CMFPlone.i18nl10n import monthname_msgid_abbr
1✔
115
        from Products.CMFPlone.i18nl10n import weekdayname_msgid
1✔
116
        from Products.CMFPlone.i18nl10n import weekdayname_msgid_abbr
1✔
117
        if request is None:
1✔
118
            portal = api.portal.get()
×
119
            request = portal.REQUEST
×
120
        # first replace parts to translate
121
        custom_format = custom_format.replace('%%', '_p_c_')
1✔
122

123
        conf = {
1✔
124
            'a': {'fct': weekdayname_msgid_abbr, 'fmt': '%w', 'low': day_lc},
125
            'A': {'fct': weekdayname_msgid, 'fmt': '%w', 'low': day_lc},
126
            'b': {'fct': monthname_msgid_abbr, 'fmt': '%m', 'low': month_lc},
127
            'B': {'fct': monthname_msgid, 'fmt': '%m', 'low': month_lc},
128
        }
129
        matches = re.findall(r'%([aAbB])', custom_format)
1✔
130
        for match in sorted(set(matches)):
1✔
131
            # function( int(date.strftime(format) )
132
            msgid = conf[match]['fct'](int(date.strftime(conf[match]['fmt'])))
1✔
133
            repl = i18n.translate(msgid, domain, context=request, target_language=target_language)
1✔
134
            if conf[match]['low']:
1✔
135
                repl = repl.lower()
1✔
136
            custom_format = re.sub('%{}'.format(match), repl, custom_format)
1✔
137

138
        # then format date
139
        custom_format = custom_format.replace('_p_c_', '%%')
1✔
140
        formatted_date = date.strftime(custom_format.encode('utf8'))
1✔
141
    return safe_unicode(formatted_date)
1✔
142

143

144
def remove_tmp_file(filename):
1✔
145
    """Do not break if unable to remove temporary file, but log error if any."""
146
    try:
1✔
147
        os.remove(filename)
1✔
148
    except OSError:
1✔
149
        logger.warn("Could not remove temporary file at {0}".format(filename))
1✔
150

151

152
def update_oo_config():
1✔
153
    """ Update config following buildout var """
154
    key_template = 'collective.documentgenerator.browser.controlpanel.IDocumentGeneratorControlPanelSchema.{}'
1✔
155
    var = {'oo_server': 'OO_SERVER', 'oo_port_list': 'OO_PORT', 'uno_path': 'PYTHON_UNO'}
1✔
156
    for key in var.keys():
1✔
157
        full_key = key_template.format(key)
1✔
158
        configured_oo_option = api.portal.get_registry_record(full_key)
1✔
159
        env_value = os.getenv(var.get(key, 'NO_ONE'), None)
1✔
160
        if env_value:
1✔
161
            new_oo_option = type(configured_oo_option)(os.getenv(var.get(key, 'NO_ONE'), ''))
1✔
162
            if new_oo_option and new_oo_option != configured_oo_option:
1✔
163
                api.portal.set_registry_record(full_key, new_oo_option)
1✔
164
    logger.info("LibreOffice configuration updated for " + getSite().getId())
1✔
165

166

167
def update_oo_config_after_bigbang(event):
1✔
168
    setSite(event.object)
×
169
    try:
×
170
        update_oo_config()
×
171
    except Exception:
×
172
        logger.error("Update LibreOffice configuration failed", exc_info=1)
×
173

174

175
def get_site_root_relative_path(obj):
1✔
176
    return "/" + '/'.join(
1✔
177
        getToolByName(obj, 'portal_url').getRelativeContentPath(obj)
178
    )
179

180

181
def temporary_file_name(suffix=''):
1✔
182
    tmp_dir = os.getenv('CUSTOM_TMP', None)
1✔
183
    if tmp_dir and not os.path.exists(tmp_dir):
1✔
184
        os.mkdir(tmp_dir)
1✔
185
    return tempfile.mktemp(suffix=suffix, dir=tmp_dir)
1✔
186

187

188
def create_temporary_file(initial_file=None, base_name=''):
1✔
189
    tmp_filename = temporary_file_name(suffix=base_name)
1✔
190
    # create the file in any case
191
    with open(tmp_filename, 'w+') as tmp_file:
1✔
192
        if initial_file:
1✔
193
            tmp_file.write(initial_file.data)
1✔
194
    return tmp_file
1✔
195

196

197
def clean_notes(pod_template):
1✔
198
    """ Use appy.pod Cleaner to clean notes (comments). """
199
    cleaned = 0
1✔
200
    odt_file = pod_template.odt_file
1✔
201
    if odt_file:
1✔
202
        # write file to /tmp to be able to use appy.pod Cleaner
203
        tmp_file = create_temporary_file(odt_file, '-to-clean.odt')
1✔
204
        cleaner = Cleaner(path=tmp_file.name, verbose=1)
1✔
205
        cleaned = cleaner.run()
1✔
206
        if cleaned:
1✔
207
            manually_modified = pod_template.has_been_modified()
1✔
208
            with open(tmp_file.name, 'rb') as res_file:
1✔
209
                # update template
210
                result = NamedBlobFile(
1✔
211
                    data=res_file.read(),
212
                    contentType=odt_file.contentType,
213
                    filename=pod_template.odt_file.filename)
214
            pod_template.odt_file = result
1✔
215
            if not manually_modified:
1✔
216
                pod_template.style_modification_md5 = pod_template.current_md5
1✔
217
            extras = 'pod_template={0} cleaned_parts={1}'.format(
1✔
218
                repr(pod_template), cleaned)
219
            fplog('clean_notes', extras=extras)
1✔
220
        remove_tmp_file(tmp_file.name)
1✔
221

222
    return bool(cleaned)
1✔
223

224

225
def convert_odt(afile, output_name, fmt='pdf', **kwargs):
1✔
226
    """
227
    Convert an odt file to another format using appy.pod.
228

229
    :param afile: file field content like NamedBlobFile
230
    :param output_name: output name
231
    :param fmt: output format, default to 'pdf'
232
    :param kwargs: other parameters passed to Renderer, i.e pdfOptions='ExportNotes=True;SelectPdfVersion=1'
233
    """
234
    lo_pool = LoPool.get(
1✔
235
        python=config.get_uno_path(),
236
        server=config.get_oo_server(),
237
        port=config.get_oo_port_list(),
238
    )
239
    if not lo_pool:
1✔
NEW
240
        raise Exception("Could not find LibreOffice, check your configuration")
×
241

242
    temp_file = create_temporary_file(afile, '.odt')
1✔
243
    converted_filename = None
1✔
244
    try:
1✔
245
        renderer = Renderer(
1✔
246
            temp_file.name,
247
            afile,
248
            temporary_file_name(suffix=".{extension}".format(extension=fmt)),
249
            **kwargs
250
        )
251

252
        lo_pool(renderer, temp_file.name, fmt)
1✔
253
        converted_filename = temp_file.name.replace('.odt', '.{}'.format(fmt))
1✔
254
        if not os.path.exists(converted_filename):
1✔
NEW
255
            api.portal.show_message(
×
256
                message=_(u"Conversion failed, no converted file '{}'".format(safe_unicode(output_name))),
257
                request=getSite().REQUEST,
258
                type="error",
259
            )
NEW
260
            raise Invalid(u"Conversion failed, no converted file '{}'".format(safe_unicode(output_name)))
×
261
        with open(converted_filename, 'rb') as f:
1✔
262
            converted_file = f.read()
1✔
263
    finally:
264
        remove_tmp_file(temp_file.name)
1✔
265
        if converted_filename:
1✔
266
            remove_tmp_file(converted_filename)
1✔
267

268
    return output_name, converted_file
1✔
269

270

271
def convert_and_save_odt(afile, container, portal_type, output_name, fmt='pdf', from_uid=None, attributes=None,
1✔
272
                         **kwargs):
273
    """
274
    Convert an odt file to another format using appy.pod and save it in a NamedBlobFile.
275

276
    :param afile: file field content like NamedBlobFile
277
    :param container: container object to create new file
278
    :param portal_type: portal type
279
    :param output_name: output name
280
    :param fmt: output format, default to 'pdf'
281
    :param from_uid: uid from original file object
282
    :param attributes: dict of other attributes to set on created content
283
    :param kwargs: other parameters passed to Renderer, i.e pdfOptions='ExportNotes=True;SelectPdfVersion=1'
284
    """
NEW
285
    converted_filename, converted_file = convert_odt(afile, output_name, fmt=fmt, **kwargs)
×
NEW
286
    file_object = NamedBlobFile(converted_file, filename=safe_unicode(converted_filename))
×
NEW
287
    if attributes is None:
×
NEW
288
        attributes = {}
×
NEW
289
    attributes["conv_from_uid"] = from_uid
×
NEW
290
    new_file = createContentInContainer(
×
291
        container,
292
        portal_type,
293
        title=converted_filename,
294
        file=file_object,
295
        **attributes)
NEW
296
    if from_uid:
×
NEW
297
        annot = IAnnotations(new_file)
×
NEW
298
        annot["documentgenerator"] = {"conv_from_uid": from_uid}
×
NEW
299
    return new_file
×
300

301

302
@api.env.mutually_exclusive_parameters("document", "document_uid")
1✔
303
def need_mailing_value(document=None, document_uid=None):
1✔
NEW
304
    if not document:
×
NEW
305
        document = uuidToObject(document_uid, unrestricted=True)
×
NEW
306
    annot = IAnnotations(document)
×
NEW
307
    if "documentgenerator" in annot and annot["documentgenerator"].get("need_mailing", False):
×
NEW
308
        return True
×
NEW
309
    return False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc