• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

collective / collective.documentgenerator / 20718532190

05 Jan 2026 02:29PM UTC coverage: 90.341% (-0.2%) from 90.506%
20718532190

push

github

chris-adam
Improved utils convert function to convert files other than odt

14 of 22 new or added lines in 2 files covered. (63.64%)

2 existing lines in 1 file now uncovered.

2198 of 2433 relevant lines covered (90.34%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.79
/src/collective/documentgenerator/utils.py
1
# -*- coding: utf-8 -*-
2
from appy.bin.odfclean import Cleaner
1✔
3
from collective.documentgenerator import _
1✔
4
from collective.documentgenerator import BLDT_DIR
1✔
5
from imio.helpers.content import uuidToObject
1✔
6
from imio.helpers.security import fplog
1✔
7
from imio.pyutils.system import runCommand
1✔
8
from plone import api
1✔
9
from plone.dexterity.utils import createContentInContainer
1✔
10
from plone.namedfile.file import NamedBlobFile
1✔
11
from Products.CMFCore.utils import getToolByName
1✔
12
from Products.CMFPlone.utils import safe_unicode
1✔
13
from zope import i18n
1✔
14
from zope.annotation import IAnnotations
1✔
15
from zope.component import getMultiAdapter
1✔
16
from zope.component.hooks import getSite
1✔
17
from zope.component.hooks import setSite
1✔
18
from zope.interface import Interface
1✔
19
from zope.interface import Invalid
1✔
20
from zope.lifecycleevent import Attributes
1✔
21
from zope.lifecycleevent import modified
1✔
22

23
import hashlib
1✔
24
import logging
1✔
25
import os
1✔
26
import re
1✔
27
import tempfile
1✔
28

29

30
logger = logging.getLogger('collective.documentgenerator')
1✔
31

32

33
def translate(msgid, domain='collective.documentgenerator'):
1✔
34
    portal = api.portal.get()
1✔
35
    translation = i18n.translate(
1✔
36
        msgid,
37
        domain=domain,
38
        context=portal.REQUEST
39
    )
40
    return translation
1✔
41

42

43
def compute_md5(data):
1✔
44
    md5 = hashlib.md5(data).hexdigest()
1✔
45
    return md5
1✔
46

47

48
def update_templates(templates, profile='', force=False):
1✔
49
    """
50
        function to manage templates update.
51
        # see http://trac.imio.be/trac/ticket/9383 for full implementation
52
        :param list templates: list of tuples containing ('plone-template-path', 'os-template-path')
53
        :param str profile: profile path stored on template (or various identification)
54
        :param bool force: force overrides of templates
55
    """
56
    # Don't use profile now !
57
    portal = api.portal.getSite()
1✔
58
    ret = []
1✔
59
    for (ppath, ospath) in templates:
1✔
60
        ppath = ppath.strip('/ ')
1✔
61
        obj = portal.unrestrictedTraverse(ppath, default=None)
1✔
62
        if not obj:
1✔
63
            logger.warn("The plone template '%s' was not found" % ppath)
1✔
64
            ret.append((ppath, ospath, 'plone path error'))
1✔
65
            continue
1✔
66
        if not obj.odt_file:
1✔
67
            logger.warn("The plone template '%s' doesn't have odt file" % ppath)
×
68
            ret.append((ppath, ospath, 'no odt file'))
×
69
            continue
×
70
        if not os.path.exists(ospath):
1✔
71
            logger.warn("The template file '%s' doesn't exist" % ospath)
1✔
72
            ret.append((ppath, ospath, 'os path error'))
1✔
73
            continue
1✔
74
        with open(ospath, 'rb') as f:
1✔
75
            data = f.read()
1✔
76
            new_md5 = compute_md5(data)
1✔
77
            if obj.initial_md5 == new_md5:
1✔
78
                ret.append((ppath, ospath, 'unchanged'))
1✔
79
                continue
1✔
80
            elif obj.has_been_modified() and not force:
1✔
81
                ret.append((ppath, ospath, 'kept'))
1✔
82
                continue
1✔
83
            obj.initial_md5 = new_md5
1✔
84
            obj.style_modification_md5 = new_md5
1✔
85
            obj.odt_file.data = data
1✔
86
            modified(obj, Attributes(Interface, 'odt_file'))
1✔
87
            ret.append((ppath, ospath, 'replaced'))
1✔
88
    return ret
1✔
89

90

91
def update_dict_with_validation(original_dict, update_dict, error_message=_("Dict update collision on key")):
1✔
92
    for key in update_dict:
1✔
93
        if key in original_dict:
1✔
94
            raise Invalid(_("${error_message} for key = '${key}'",
1✔
95
                            mapping={'error_message': error_message, 'key': key}))
96

97
        original_dict[key] = update_dict[key]
1✔
98

99

100
def ulocalized_time(date, long_format=None, time_only=None, custom_format=None,
1✔
101
                    domain='plonelocales', target_language=None, context=None,
102
                    request=None, month_lc=True, day_lc=True):
103
    """
104
        Return for a datetime the string value with week and mont translated.
105
        Take into account %a, %A, %b, %B
106
    """
107
    if not custom_format:
1✔
108
        # use toLocalizedTime
109
        plone = getMultiAdapter((context, request), name=u'plone')
1✔
110
        formatted_date = plone.toLocalizedTime(date, long_format, time_only)
1✔
111
    else:
112
        from Products.CMFPlone.i18nl10n import monthname_msgid
1✔
113
        from Products.CMFPlone.i18nl10n import monthname_msgid_abbr
1✔
114
        from Products.CMFPlone.i18nl10n import weekdayname_msgid
1✔
115
        from Products.CMFPlone.i18nl10n import weekdayname_msgid_abbr
1✔
116
        if request is None:
1✔
117
            portal = api.portal.get()
×
118
            request = portal.REQUEST
×
119
        # first replace parts to translate
120
        custom_format = custom_format.replace('%%', '_p_c_')
1✔
121

122
        conf = {
1✔
123
            'a': {'fct': weekdayname_msgid_abbr, 'fmt': '%w', 'low': day_lc},
124
            'A': {'fct': weekdayname_msgid, 'fmt': '%w', 'low': day_lc},
125
            'b': {'fct': monthname_msgid_abbr, 'fmt': '%m', 'low': month_lc},
126
            'B': {'fct': monthname_msgid, 'fmt': '%m', 'low': month_lc},
127
        }
128
        matches = re.findall(r'%([aAbB])', custom_format)
1✔
129
        for match in sorted(set(matches)):
1✔
130
            # function( int(date.strftime(format) )
131
            msgid = conf[match]['fct'](int(date.strftime(conf[match]['fmt'])))
1✔
132
            repl = i18n.translate(msgid, domain, context=request, target_language=target_language)
1✔
133
            if conf[match]['low']:
1✔
134
                repl = repl.lower()
1✔
135
            custom_format = re.sub('%{}'.format(match), repl, custom_format)
1✔
136

137
        # then format date
138
        custom_format = custom_format.replace('_p_c_', '%%')
1✔
139
        formatted_date = date.strftime(custom_format.encode('utf8'))
1✔
140
    return safe_unicode(formatted_date)
1✔
141

142

143
def remove_tmp_file(filename):
1✔
144
    """Do not break if unable to remove temporary file, but log error if any."""
145
    try:
1✔
146
        os.remove(filename)
1✔
UNCOV
147
    except OSError:
×
UNCOV
148
        logger.warn("Could not remove temporary file at {0}".format(filename))
×
149

150

151
def update_oo_config():
1✔
152
    """ Update config following buildout var """
153
    key_template = 'collective.documentgenerator.browser.controlpanel.IDocumentGeneratorControlPanelSchema.{}'
1✔
154
    var = {'oo_server': 'OO_SERVER', 'oo_port_list': 'OO_PORT', 'uno_path': 'PYTHON_UNO'}
1✔
155
    for key in var.keys():
1✔
156
        full_key = key_template.format(key)
1✔
157
        configured_oo_option = api.portal.get_registry_record(full_key)
1✔
158
        env_value = os.getenv(var.get(key, 'NO_ONE'), None)
1✔
159
        if env_value:
1✔
160
            new_oo_option = type(configured_oo_option)(os.getenv(var.get(key, 'NO_ONE'), ''))
1✔
161
            if new_oo_option and new_oo_option != configured_oo_option:
1✔
162
                api.portal.set_registry_record(full_key, new_oo_option)
1✔
163
    logger.info("LibreOffice configuration updated for " + getSite().getId())
1✔
164

165

166
def update_oo_config_after_bigbang(event):
1✔
167
    setSite(event.object)
×
168
    try:
×
169
        update_oo_config()
×
170
    except Exception:
×
171
        logger.error("Update LibreOffice configuration failed", exc_info=1)
×
172

173

174
def get_site_root_relative_path(obj):
1✔
175
    return "/" + '/'.join(
1✔
176
        getToolByName(obj, 'portal_url').getRelativeContentPath(obj)
177
    )
178

179

180
def temporary_file_name(suffix=''):
1✔
181
    tmp_dir = os.getenv('CUSTOM_TMP', None)
1✔
182
    if tmp_dir and not os.path.exists(tmp_dir):
1✔
183
        os.mkdir(tmp_dir)
1✔
184
    return tempfile.mktemp(suffix=suffix, dir=tmp_dir)
1✔
185

186

187
def create_temporary_file(initial_file=None, base_name=''):
1✔
188
    tmp_filename = temporary_file_name(suffix=base_name)
1✔
189
    # create the file in any case
190
    with open(tmp_filename, 'w+') as tmp_file:
1✔
191
        if initial_file:
1✔
192
            tmp_file.write(initial_file.data)
1✔
193
    return tmp_file
1✔
194

195

196
def clean_notes(pod_template):
1✔
197
    """ Use appy.pod Cleaner to clean notes (comments). """
198
    cleaned = 0
1✔
199
    odt_file = pod_template.odt_file
1✔
200
    if odt_file:
1✔
201
        # write file to /tmp to be able to use appy.pod Cleaner
202
        tmp_file = create_temporary_file(odt_file, '-to-clean.odt')
1✔
203
        cleaner = Cleaner(path=tmp_file.name, verbose=1)
1✔
204
        cleaned = cleaner.run()
1✔
205
        if cleaned:
1✔
206
            manually_modified = pod_template.has_been_modified()
1✔
207
            with open(tmp_file.name, 'rb') as res_file:
1✔
208
                # update template
209
                result = NamedBlobFile(
1✔
210
                    data=res_file.read(),
211
                    contentType=odt_file.contentType,
212
                    filename=pod_template.odt_file.filename)
213
            pod_template.odt_file = result
1✔
214
            if not manually_modified:
1✔
215
                pod_template.style_modification_md5 = pod_template.current_md5
1✔
216
            extras = 'pod_template={0} cleaned_parts={1}'.format(
1✔
217
                repr(pod_template), cleaned)
218
            fplog('clean_notes', extras=extras)
1✔
219
        remove_tmp_file(tmp_file.name)
1✔
220

221
    return bool(cleaned)
1✔
222

223

224
def convert_file(afile, output_name, fmt='pdf'):
1✔
225
    """
226
    Convert a file to another libreoffice readable format using appy.pod
227

228
    :param afile: file field content like NamedBlobFile
229
    :param output_name: output name
230
    :param fmt: output format, default to 'pdf'
231
    """
232
    from appy.pod import converter
1✔
233
    converter_path = converter.__file__.endswith('.pyc') and converter.__file__[:-1] or converter.__file__
1✔
234
    file_ext = afile.filename.split('.')[-1].lower()
1✔
235
    temp_file = create_temporary_file(afile, base_name='.{}'.format(file_ext))
1✔
236
    converted_filename = temp_file.name.replace('.{}'.format(file_ext), '.{}'.format(fmt))
1✔
237
    converted_file = ''
1✔
238
    try:
1✔
239
        command = "python3 {converter_path} {temp_file} {fmt}".format(converter_path=converter_path, temp_file=temp_file.name, fmt=fmt)
1✔
240
        out, err, code = runCommand(command)
1✔
241
        # This command has no output on success
242
        if code != 0 or err or not os.path.exists(converted_filename):
1✔
NEW
243
            print(code, out, err, converted_filename, os.path.exists(converted_filename))
×
NEW
244
            message = _(u"Conversion failed, no converted file '{}'".format(safe_unicode(output_name)))
×
NEW
245
            raise Invalid(message)
×
246
        with open(converted_filename, 'rb') as f:
1✔
247
            converted_file = f.read()
1✔
NEW
248
    except Exception as e:
×
NEW
249
        api.portal.show_message(message=str(e), request=getSite().REQUEST, type="error")
×
250
    finally:
251
        remove_tmp_file(temp_file.name)
1✔
252
        if os.path.exists(converted_filename):
1✔
253
            remove_tmp_file(converted_filename)
1✔
254
    return output_name, converted_file
1✔
255

256

257
def convert_and_save_file(afile, container, portal_type, output_name, fmt='pdf', from_uid=None, attributes=None):
1✔
258
    """
259
    Convert a file to another libreoffice readable format using appy.pod and save it in a NamedBlobFile.
260

261
    :param afile: file field content like NamedBlobFile
262
    :param container: container object to create new file
263
    :param portal_type: portal type
264
    :param output_name: output name
265
    :param fmt: output format, default to 'pdf'
266
    :param from_uid: uid from original file object
267
    :param attributes: dict of other attributes to set on created content
268
    """
NEW
269
    converted_filename, converted_file = convert_file(afile, output_name, fmt=fmt)
×
270
    file_object = NamedBlobFile(converted_file, filename=safe_unicode(converted_filename))
×
271
    if attributes is None:
×
272
        attributes = {}
×
273
    attributes["conv_from_uid"] = from_uid
×
274
    new_file = createContentInContainer(
×
275
        container,
276
        portal_type,
277
        title=converted_filename,
278
        file=file_object,
279
        **attributes)
280
    if from_uid:
×
281
        annot = IAnnotations(new_file)
×
282
        annot["documentgenerator"] = {"conv_from_uid": from_uid}
×
283
    return new_file
×
284

285

286
@api.env.mutually_exclusive_parameters("document", "document_uid")
1✔
287
def need_mailing_value(document=None, document_uid=None):
1✔
288
    if not document:
×
289
        document = uuidToObject(document_uid, unrestricted=True)
×
290
    annot = IAnnotations(document)
×
291
    if "documentgenerator" in annot and annot["documentgenerator"].get("need_mailing", False):
×
292
        return True
×
293
    return False
×
294

295

296
def odfsplit(content):
1✔
297
    """Splits an ODT document into a series of sub-documents. The split is based on page breaks.
298

299
    :param content: The binary content of the ODT file to be split.
300
    :return: A tuple containing the exit code, a generator yielding the binary content of each subfile and
301
             the number of files
302
    """
303

304
    def get_subfiles(temp_file, nb_files):
1✔
305
        if nb_files == 1:
1✔
306
            with open(temp_file, "rb") as f:
1✔
307
                yield f.read()
1✔
308
            remove_tmp_file(temp_file)
1✔
309
            return
1✔
310
        for i in range(1, nb_files + 1):
1✔
311
            subfile = temp_file.replace(".odt", ".{}.odt".format(i))
1✔
312
            with open(subfile, "rb") as sf:
1✔
313
                yield sf.read()
1✔
314
            remove_tmp_file(subfile)
1✔
315
        remove_tmp_file(temp_file)
1✔
316

317
    temp_file = temporary_file_name(suffix=".odt")
1✔
318
    with open(temp_file, "wb") as f:
1✔
319
        f.write(content)
1✔
320
    command = "{pwd}/bin/odfsplit {temp_file}".format(temp_file=temp_file, pwd=BLDT_DIR)
1✔
321
    out, err, code = runCommand(command)
1✔
322
    nb_files = 0
1✔
323
    if out and code == 0:
1✔
324
        part0 = out[0].split(" ")[0]  # Ex: "2 files were generated."
1✔
325
        if part0.isdigit():
1✔
326
            nb_files = int(part0)
1✔
327
            value = get_subfiles(temp_file, nb_files)
1✔
328
        else:
329
            nb_files = 1
1✔
330
            value = get_subfiles(temp_file, 1)
1✔
331
    else:
332
        value = "".join(err)
1✔
333
    return code, value, nb_files
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc