• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rdmorganiser / rdmo / 20164756478

12 Dec 2025 11:04AM UTC coverage: 94.814% (+0.02%) from 94.796%
20164756478

Pull #1427

github

web-flow
Merge 0c7b64b9d into 79917de8d
Pull Request #1427: RDMO 2.4.0 🎆

2124 of 2229 branches covered (95.29%)

22688 of 23929 relevant lines covered (94.81%)

3.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.68
rdmo/core/xml.py
1
import logging
4✔
2
import re
4✔
3
from collections import OrderedDict
4✔
4
from pathlib import Path
4✔
5
from xml.etree.ElementTree import Element as xmlElement
4✔
6

7
from django.utils.translation import gettext_lazy as _
4✔
8

9
import defusedxml.ElementTree as ET
4✔
10
from packaging.version import Version, parse
4✔
11

12
from rdmo import __version__
4✔
13
from rdmo.core.constants import RDMO_MODELS
4✔
14
from rdmo.core.imports import ImportElementFields
4✔
15

16
logger = logging.getLogger(__name__)
4✔
17

18
LEGACY_RDMO_XML_VERSION = '1.11.0'
4✔
19
ELEMENTS_USING_KEY = {RDMO_MODELS['attribute']}
4✔
20

21

22
def resolve_file(file_name: str) -> tuple[Path | None, str | None]:
4✔
23
    file = Path(file_name).resolve()
4✔
24
    if file.exists():
4✔
25
        return file, None
4✔
26
    return  None, _('This file does not exists.')
4✔
27

28

29
def read_xml(file: Path) -> tuple[xmlElement | None, str | None]:
4✔
30
    # step 2: parse xml and get the root
31
    try:
4✔
32
        root = ET.parse(file).getroot()
4✔
33
        return root, None
4✔
34
    except Exception as e:
4✔
35
        return None, _('XML Parsing Error') + f': {e!s}'
4✔
36

37

38
def validate_root(root: xmlElement | None) -> tuple[bool, str | None]:
4✔
39
    if root is None:
4✔
40
        return False, _('The content of the XML file does not consist of well-formed data or markup.')
4✔
41
    if root.tag != 'rdmo':
4✔
42
        return False, _('This XML does not contain RDMO content.')
4✔
43
    return True, None
4✔
44

45

46
def validate_and_get_xml_version_from_root(root: xmlElement) -> tuple[Version | None, list]:
4✔
47
    rdmo_version = parse(__version__)
4✔
48

49
    # Extract version attributes from the XML root
50
    unparsed_required_version = root.attrib.get('required')  # New required version field
4✔
51
    unparsed_root_version = root.attrib.get('version') or LEGACY_RDMO_XML_VERSION  # Fallback to legacy default
4✔
52

53
    # Validate the 'required' attribute if it exists
54
    if unparsed_required_version:
4✔
55
        try:
4✔
56
            required_version = parse(unparsed_required_version)
4✔
57
        except ValueError:
×
58
            logger.info('Import failed: Invalid "required" format in XML (%s)', unparsed_required_version)
×
59
            errors = [_('The "required" attribute in this RDMO XML file is not a valid version.')]
×
60
            return None, errors
×
61

62
        if required_version > rdmo_version:
4✔
63
            logger.info('Import failed: Required version (%s) > RDMO instance version (%s)', required_version,
4✔
64
                        rdmo_version)
65
            errors = [
4✔
66
                _('This RDMO XML file requires a newer RDMO version to be imported.'),
67
                f'Required version: {required_version}, Current version: {rdmo_version}.'
68
            ]
69
            return None, errors
4✔
70

71
    # Fallback to validate the legacy 'version' field
72
    try:
4✔
73
        xml_version = parse(unparsed_root_version)
4✔
74
        return xml_version, []
4✔
75
    except ValueError:
4✔
76
        logger.info('Import failed: Invalid "version" format in XML (%s)', unparsed_root_version)
4✔
77
        errors = [_('The "version" attribute in this RDMO XML file is not a valid version.')]
4✔
78
        return None, errors
4✔
79

80

81
def validate_legacy_elements(elements: dict, root_version: Version) -> list[str]:
4✔
82

83
    try:
4✔
84
        validate_pre_conversion_for_missing_key_in_legacy_elements(elements, root_version)
4✔
85
        return []
4✔
86
    except ValueError as e:
4✔
87
        logger.info('Import failed with ValueError (%s)', str(e))
4✔
88
        errors = [
4✔
89
            _('XML Parsing Error') + f': {e!s}',
90
            _('This is not a valid RDMO XML file.')
91
        ]
92
        return errors
4✔
93

94

95
def parse_elements(root: xmlElement) -> tuple[dict, str | None]:
4✔
96
    # step 3: create element dicts from xml
97
    try:
4✔
98
        elements = flat_xml_to_elements(root)
4✔
99
        return elements, None
4✔
100
    except (KeyError, TypeError, AttributeError) as e:
×
101
        logger.info('Import failed with %s (%s)', type(e).__name__, e)
×
102
        return {}, _('This is not a valid RDMO XML file.')
×
103

104

105
def parse_xml_to_elements(xml_file=None) -> tuple[OrderedDict, list]:
4✔
106

107
    errors = []
4✔
108

109
    file, file_error = resolve_file(xml_file)
4✔
110
    if file_error is not None:
4✔
111
        logger.error(file_error)
4✔
112
        errors.append(file_error)
4✔
113
        return OrderedDict(), errors
4✔
114

115
    root, read_error = read_xml(file)
4✔
116

117
    if read_error:
4✔
118
        logger.error(read_error)
4✔
119
        errors.append(read_error)
4✔
120

121
    # step 2.1: validate the xml root
122
    root_validation, root_validation_error = validate_root(root)
4✔
123
    if root_validation is not True:
4✔
124
        logger.error('Root element validation failed. %s', root_validation_error)
4✔
125
        errors.insert(0, root_validation_error)
4✔
126
        return OrderedDict(), errors
4✔
127

128
    # step 3: create element dicts from xml
129
    elements, parsing_error = parse_elements(root)
4✔
130
    if parsing_error is not None:
4✔
131
        errors.append(parsing_error)
×
132
        return OrderedDict(), errors
×
133

134
    # step 3.1: validate version
135
    root_version, version_errors = validate_and_get_xml_version_from_root(root)
4✔
136
    if version_errors:
4✔
137
        errors.extend(version_errors)
4✔
138
        return OrderedDict(), errors
4✔
139

140
    # step 3.1.1: validate the legacy elements
141
    legacy_errors = validate_legacy_elements(elements, root_version)
4✔
142
    if legacy_errors:
4✔
143
        errors.extend(legacy_errors)
4✔
144
        return OrderedDict(), errors
4✔
145

146
    # step 4: convert elements from previous versions
147
    elements = convert_elements(elements, root_version)
4✔
148

149
    # step 5: order the elements and return
150
    # ordering of elements is done in the import_elements function
151

152
    logger.info('XML parsing of %s success (length: %s).', file.name, len(elements))
4✔
153

154
    return elements, errors
4✔
155

156

157
def read_xml_file(file_name, raise_exception=False):
4✔
158
    try:
4✔
159
        return ET.parse(file_name).getroot()
4✔
160
    except Exception as e:
4✔
161
        logger.error('Xml file parsing error at getroot: %s', str(e))
4✔
162
        if raise_exception:
4✔
163
            raise e from e
×
164

165

166
def parse_xml_string(string):
4✔
167
    try:
×
168
        return ET.fromstring(string)
×
169
    except Exception as e:
×
170
        logger.error('Xml parsing from string error: %s', str(e))
×
171

172

173
def flat_xml_to_elements(root) -> dict:
4✔
174
    elements = {}
4✔
175
    ns_map = get_ns_map(root)
4✔
176
    uri_attrib = get_ns_tag('dc:uri', ns_map)
4✔
177

178
    for node in root:
4✔
179
        uri = get_uri(node, ns_map)
4✔
180

181
        element = {
4✔
182
            'uri': get_uri(node, ns_map),
183
            'model': RDMO_MODELS[node.tag]
184
        }
185

186
        for sub_node in node:
4✔
187
            tag = strip_ns(sub_node.tag, ns_map)
4✔
188

189
            if uri_attrib in sub_node.attrib:
4✔
190
                # this node has an uri!
191
                element[tag] = {
4✔
192
                    'uri': sub_node.attrib[uri_attrib]
193
                }
194
                if sub_node.tag in RDMO_MODELS:
4✔
195
                    element[tag]['model'] = RDMO_MODELS[sub_node.tag]
4✔
196
            elif 'lang' in sub_node.attrib:
4✔
197
                # this node has the lang attribute!
198
                element['{}_{}'.format(tag, sub_node.attrib['lang'])] = sub_node.text
4✔
199
            elif list(sub_node):
4✔
200
                # this node is a list!
201
                element[tag] = []
4✔
202
                for sub_sub_node in sub_node:
4✔
203
                    sub_element = {
4✔
204
                        'uri': sub_sub_node.attrib[uri_attrib]
205
                    }
206
                    if sub_sub_node.tag in RDMO_MODELS:
4✔
207
                        sub_element['model'] = RDMO_MODELS[sub_sub_node.tag]
4✔
208
                    if 'order' in sub_sub_node.attrib:
4✔
209
                        sub_element['order'] = sub_sub_node.attrib['order']
4✔
210

211
                    element[tag].append(sub_element)
4✔
212
            elif sub_node.text is None or not sub_node.text.strip():
4✔
213
                element[tag] = None
4✔
214
            else:
215
                element[tag] = sub_node.text
4✔
216

217
        elements[uri] = element
4✔
218

219
    return elements
4✔
220

221

222
def get_ns_tag(tag, ns_map):
4✔
223
    tag_split = tag.split(':')
4✔
224
    try:
4✔
225
        return f'{{{ns_map[tag_split[0]]}}}{tag_split[1]}'
4✔
226
    except KeyError:
4✔
227
        return None
4✔
228

229

230
def get_ns_map(treenode):
4✔
231
    ns_map = {}
4✔
232
    treestring = ET.tostring(treenode, encoding='utf8', method='xml')
4✔
233

234
    for match in re.finditer(r'(xmlns:)(.*?)(=")(.*?)(")', str(treestring)):
4✔
235
        if match:
4✔
236
            ns_map[match.group(2)] = match.group(4)
4✔
237

238
    return ns_map
4✔
239

240

241
def get_uri(treenode, ns_map):
4✔
242
    if treenode is not None:
4✔
243
        ns_tag = get_ns_tag('dc:uri', ns_map)
4✔
244
        if ns_tag is not None:
4✔
245
            return treenode.attrib.get(ns_tag)
4✔
246

247

248
def strip_ns(tag, ns_map):
4✔
249
    for ns in ns_map.values():
4✔
250
        if tag.startswith(f'{{{ns}}}'):
4✔
251
            return tag.replace(f'{{{ns}}}', '')
4✔
252
    return tag
4✔
253

254

255
def convert_elements(elements, version: Version):
4✔
256
    if version < parse('2.0.0'):
4✔
257
        validate_pre_conversion_for_missing_key_in_legacy_elements(elements, version)
4✔
258
        elements = convert_legacy_elements(elements)
4✔
259

260
    if version < parse('2.1.0'):
4✔
261
        elements = convert_additional_input(elements)
4✔
262

263
    if version < parse('2.3.0'):
4✔
264
        elements = convert_autocomplete(elements)
4✔
265

266
    return elements
4✔
267

268

269
def validate_pre_conversion_for_missing_key_in_legacy_elements(elements, version: Version) -> None:
4✔
270
    if version < parse('2.0.0'):
4✔
271
        models_in_elements = {i['model'] for i in elements.values()}
4✔
272
        if models_in_elements <= ELEMENTS_USING_KEY:
4✔
273
            # xml contains only domain.attribute or is empty
274
            return
4✔
275
        # inspect the elements for missing 'key' fields
276
        elements_to_inspect = filter(lambda x: x['model'] not in ELEMENTS_USING_KEY, elements.values())
4✔
277
        if not any('key' in el for el in elements_to_inspect):
4✔
278
            raise ValueError(f"Missing legacy elements, elements containing 'key' were expected for this XML with version {version} and elements {models_in_elements}.")   # noqa: E501
4✔
279

280

281
def update_related_legacy_elements(elements: dict,
4✔
282
                                   target_uri: str, source_model: str,
283
                                   legacy_element_field: str, element_field: str):
284
    # search for the related elements that use the uri
285
    related_elements = [
4✔
286
        element for element in elements.values()
287
        if element['model'] == source_model
288
        and element.get(legacy_element_field, {}).get('uri') == target_uri
289
    ]
290
    # write the related elements back into the related element
291
    elements[target_uri][element_field] = [
4✔
292
        {k: v for k, v in element.items() if k in ('uri', 'model', 'order')}
293
        for element in related_elements
294
    ]
295

296

297
def convert_legacy_elements(elements):
4✔
298
    # first pass: identify pages
299
    for _uri, element in elements.items():
4✔
300
        if element['model'] == 'questions.questionset':
4✔
301
            if element.get('questionset') is None:
4✔
302
                # this is now a page
303
                element['model'] = 'questions.page'
4✔
304
            else:
305
                del element['section']
4✔
306

307
    # second pass: del key, set uri_path, add order to reverse m2m through models
308
    # and sort questions into pages or questionsets
309
    for uri, element in elements.items():
4✔
310
        if element['model'] == 'conditions.condition':
4✔
311
            element['uri_path'] = element.pop('key')
4✔
312

313
        elif element['model'] == 'questions.catalog':
4✔
314
            element['uri_path'] = element.pop('key')
4✔
315
            # Add sections to the catalog
316
            update_related_legacy_elements(elements, uri, 'questions.section', 'catalog', 'sections')
4✔
317

318
        elif element['model'] == 'questions.section':
4✔
319
            del element['key']
4✔
320
            element['uri_path'] = element.pop('path')
4✔
321
            del element['catalog']  # sections do not have catalog anymore
4✔
322
            # Add section_pages to the section
323
            update_related_legacy_elements(elements, uri, 'questions.page', 'section', 'pages')
4✔
324

325
        elif element['model'] == 'questions.page':
4✔
326
            del element['key']
4✔
327
            element['uri_path'] = element.pop('path')
4✔
328
            del element['section']  # pages do not have sections anymore
4✔
329

330
            # Add page_questionsets to the page
331
            # Add questionsets to the page
332
            update_related_legacy_elements(elements, uri, 'questions.questionset', 'questionset', 'questionsets')
4✔
333

334
            # Add page_questions to the page
335
            update_related_legacy_elements(elements, uri, 'questions.question', 'question', 'questions')
4✔
336

337
            # Add page_conditions to the page
338
            update_related_legacy_elements(elements, uri, 'conditions.condition', 'condition', 'conditions')
4✔
339

340
        elif element['model'] == 'questions.questionset':
4✔
341
            del element['key']
4✔
342
            element['uri_path'] = element.pop('path')
4✔
343

344
            parent = element.get('questionset').get('uri')
4✔
345
            if parent is not None:
4✔
346
                if elements[parent].get('model') == 'questions.page':
4✔
347
                    # this questionset belongs to a page now
348
                    parent_questionsets = elements[parent].get('questionset')
4✔
349
                    parent_questionsets = parent_questionsets or []
4✔
350
                    parent_questionsets.append({
4✔
351
                        'uri': element['uri'],
352
                        'model': element['model'],
353
                        'order': element.pop('order')
354
                    })
355
                    elements[parent]['questionset'] = parent_questionsets
4✔
356
                    del element['questionset']
4✔
357
                else:
358
                    # this questionset still belongs to a questionset
359
                    element['questionset']['order'] = element.pop('order')
4✔
360

361
        elif element['model'] == 'questions.question':
4✔
362
            del element['key']
4✔
363
            element['uri_path'] = element.pop('path')
4✔
364

365
            parent = element.get('questionset').get('uri')
4✔
366
            if parent is not None:
4✔
367
                parent_questionsets = elements[parent].get('questions', [])
4✔
368
                parent_questionsets.append({
4✔
369
                    'uri': element['uri'],
370
                    'model': element['model'],
371
                    'order': element.pop('order')
372
                })
373
                elements[parent]['questions'] = parent_questionsets
4✔
374
                del element['questionset']
4✔
375

376
        elif element['model'] == 'options.optionset':
4✔
377
            element['uri_path'] = element.pop('key')
4✔
378

379
            update_related_legacy_elements(elements, uri, 'options.option', 'optionset', 'options')
4✔
380

381
        elif element['model'] == 'options.option':
4✔
382
            del element['key']
4✔
383
            element['uri_path'] = element.pop('path')
4✔
384

385
            del element['optionset']  # options do not have optionsets anymore
4✔
386

387

388
        if element['model'] == 'tasks.task':
4✔
389
            element['uri_path'] = element.pop('key')
4✔
390

391
        if element['model'] == 'views.view':
4✔
392
            element['uri_path'] = element.pop('key')
4✔
393

394
    return elements
4✔
395

396

397
def convert_additional_input(elements):
4✔
398
    for _uri, element in elements.items():
4✔
399
        if element['model'] == 'options.option':
4✔
400
            additional_input = element.get('additional_input')
4✔
401
            if additional_input in ['', 'text', 'textarea']:  # from Option.ADDITIONAL_INPUT_CHOICES
4✔
402
                pass
4✔
403
            elif additional_input == 'True':
4✔
404
                element['additional_input'] = 'text'
4✔
405
            else:
406
                element['additional_input'] = ''
4✔
407

408
    return elements
4✔
409

410

411
def convert_autocomplete(elements):
4✔
412
    for _uri, element in elements.items():
4✔
413
        if element['model'] == 'questions.question':
4✔
414
            if element['widget_type'] == 'autocomplete':
4✔
415
                element['widget_type'] = 'select'
4✔
416
            elif element['widget_type'] == 'freeautocomplete':
4✔
417
                element['widget_type'] = 'select_creatable'
×
418

419
    return elements
4✔
420

421

422
def order_elements(elements: OrderedDict) -> OrderedDict:
4✔
423
    ordered_elements = OrderedDict()
4✔
424
    for uri, element in reversed(elements.items()):
4✔
425
        append_element(ordered_elements, elements, uri, element,)
4✔
426
    return ordered_elements
4✔
427

428

429
def append_element(ordered_elements, unordered_elements, uri, element) -> None:
4✔
430
    if element is None:
4✔
431
        return
4✔
432
    for key, element_value in element.items():
4✔
433
        if key in list(ImportElementFields):
4✔
434
            continue
4✔
435

436
        if isinstance(element_value, dict):
4✔
437
            sub_uri = element_value.get('uri')
4✔
438
            sub_element = unordered_elements.get(sub_uri)
4✔
439
            if sub_uri not in ordered_elements and sub_uri is not None:
4✔
440
                append_element(ordered_elements, unordered_elements, sub_uri, sub_element)
4✔
441

442
        elif isinstance(element_value, list):
4✔
443
            for value in element_value:
4✔
444
                sub_uri = value.get('uri')
4✔
445
                sub_element = unordered_elements.get(sub_uri)
4✔
446
                if sub_uri not in ordered_elements and sub_uri is not None:
4✔
447
                    append_element(ordered_elements, unordered_elements, sub_uri, sub_element)
4✔
448

449
    if uri not in ordered_elements:
4✔
450
        ordered_elements[uri] = element
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc