• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kit-data-manager / tomo_mapper / 16725814017

04 Aug 2025 02:09PM UTC coverage: 86.311% (-3.3%) from 89.629%
16725814017

push

github

web-flow
Release v1.1.0

350 of 449 new or added lines in 22 files covered. (77.95%)

9 existing lines in 2 files now uncovered.

2396 of 2776 relevant lines covered (86.31%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.8
/src/util.py
1
import json
1✔
2
import logging
1✔
3
from pathlib import Path
1✔
4

5
from magika import Magika
1✔
6
import os
1✔
7
import tempfile
1✔
8
import time
1✔
9
from json import JSONDecodeError
1✔
10
from typing import Optional
1✔
11
import configparser
1✔
12

13
import requests
1✔
14
import zipfile
1✔
15

16
import xmltodict
1✔
17
from xml.parsers.expat import ExpatError
1✔
18

19
from src.IO.MappingAbortionError import MappingAbortionError
1✔
20
import re
1✔
21

22
def robust_textfile_read(filepath):
1✔
23
    try:
1✔
24
        with open(filepath, 'r', encoding="utf-8") as file:
1✔
25
            return file.read()
1✔
26
    except UnicodeDecodeError:
1✔
27
        try:
1✔
28
            with open(filepath, 'r', encoding="latin1") as file:
1✔
29
                return file.read()
1✔
30
        except UnicodeDecodeError:
×
31
            logging.error("Unable to determine file encoding. Aborting.")
×
32
            #TODO: since it is not clear who calls this function for what, it may make more sense to raise a unified error to handle instead of error for exit
33
            raise MappingAbortionError("File loading failed due to encoding.")
×
34

35
def load_json(source) -> Optional[dict]:
1✔
36
    """
37
    Load JSON data from a local file path or a web URL.
38

39
    :param source: A string representing either a local file path or a web URL.
40
    :return: Parsed JSON data.
41
    """
42
    if source.startswith('http://') or source.startswith('https://'):
1✔
43
        response = requests.get(source)
×
44
        response.raise_for_status()  # Raise an error for bad status codes
×
45
        return response.json()
×
46
    else:
47
        return json.loads(robust_textfile_read(source))
1✔
48

49
def is_zipfile(filepath):
1✔
50
    return zipfile.is_zipfile(filepath)
1✔
51

52
def extract_zip_file(zip_file_path):
1✔
53
    """
54
    extracts files of zip to a temporary directory
55
    :param zip_file_path: local file path to zip file
56
    :return: (path to contained emxml file, path to tmp dir) or (None, None) if no emxml file was found
57
    """
58
    temp_dir = tempfile.mkdtemp()
1✔
59

60
    start_time = time.time()  # Start time
1✔
61
    logging.info(f"Extracting {zip_file_path}...")
1✔
62

63
    target_dir = None
1✔
64

65
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
1✔
66
        total_items = len(zip_ref.namelist())
1✔
67

68
        for index, file_name in enumerate(zip_ref.namelist(), start=1):
1✔
69
            # if index%10 == 0:
70
            #     print(f"Extracting file {index}/{total_items}...")
71
            file_path = os.path.join(temp_dir, file_name)
1✔
72
            zip_ref.extract(file_name, temp_dir)
1✔
73

74
    end_time = time.time()  # End time
1✔
75
    total_time = end_time - start_time
1✔
76

77
    logging.info(f"Total time taken to process: {total_time:.2f} seconds.")
1✔
78
    return temp_dir
1✔
79

80
def strip_workdir_from_path(workdirpath, fullpath):
1✔
81
    if fullpath.startswith(workdirpath):
1✔
82
        return fullpath.replace(workdirpath, ".", 1)
1✔
83
    logging.debug("Unable to remove working directory from given path. Returning unchanged path")
×
84
    return fullpath
×
85

86
def input_to_dict(stringPayload, stick_to_wellformed=False) -> Optional[dict]:
1✔
87
    """
88
    best effort parsing of usual input formats. extend if needed
89
    :param stringPayload: string to parse
90
    :return: dict on success, None otherwise
91
    """
92
    if type(stringPayload) is not str:
1✔
93
        return None
×
94
    try:
1✔
95
        if stringPayload.startswith("<"):
1✔
96
            try:  # XML
×
97
                return xmltodict.parse(stringPayload)
×
98
            except ExpatError:
×
99
                logging.debug("Reading in input as xml not successful")
×
100
        if stringPayload.startswith("{"):
1✔
101
            try: #JSON
1✔
102
                return json.loads(stringPayload)
1✔
103
            except JSONDecodeError:
×
104
                logging.debug("Reading input as json not successful")
×
105
        if stringPayload.startswith("["): #could still be json, but would not create a dict so not a valid input anyway
1✔
106
            try: #INI
1✔
107
                dict_from_ini = {}
1✔
108
                config = configparser.ConfigParser()
1✔
109
                config.optionxform = str #do this if you do not want to read in data as lowercase
1✔
110
                config.read_string(stringPayload)
1✔
111
                for section in config.sections():
1✔
112
                    items = config.items(section)
1✔
113
                    dict_from_ini[section] = dict(items)
1✔
114
                return dict_from_ini
1✔
115
            except (configparser.NoSectionError, configparser.NoOptionError):
×
116
                logging.debug("Reading input as INI not successful")
×
117
        if stringPayload.startswith("$"):  # Check if the input starts with "$"
1✔
118
            try: #TXT
1✔
119
                dict_from_txt = {}
1✔
120
                lines = stringPayload.strip().split("\n") # Split the input into lines and process them
1✔
121
                for line in lines:
1✔
122
                    line = line.strip()
1✔
123
                    if not line:
1✔
NEW
124
                        continue
×
125
                    match = re.match(r"^(\${1,2}[\w_]+)\s+(.*)", line) # Use regex to extract key-value pairs from lines starting with $ or $$
1✔
126
                    if match:
1✔
127
                        key, value = match.groups()
1✔
128
                        dict_from_txt[key] = value.strip()  # Store key-value pairs in dictionary
1✔
129
                return dict_from_txt
1✔
NEW
130
            except Exception as e:
×
NEW
131
                logging.debug(f"Reading input as txt not successful: {e}")
×
132
        if not stick_to_wellformed and "\n" in stringPayload: #We try our best, but if this is not wanted, please stick to wellformed formats instead
×
133
            output_dict = {}
×
134
            data = stringPayload.replace("\r", "")
×
135
            lines = data.split("\n")
×
136
            for l in lines:
×
137
                if "=" in l:
×
138
                    k, v = l.split("=", 1)
×
139
                    output_dict[k.strip().replace(".", "")] = v.strip()
×
140
                else:
141
                    if ":" in l:
×
142
                        k, v = l.split(":", 1)
×
143
                        output_dict[k.strip().replace(".", "")] = v.strip()
×
144
            if output_dict: return output_dict
×
145
        logging.warning("Best effort input reading failed. Necessary reader not implemented?")
×
146
    except Exception as e:
×
147
        logging.warning("Best effort input reading failed with unexpected error. Input malformed?")
×
148
        logging.error(e)
×
149

150
def normalize_path(pathString):
1✔
151
    if "\\" in pathString: return os.path.join(*pathString.split("\\"))
×
152
    return pathString
×
153

154
def get_filetype_with_magica(filepath):
1✔
155
    m = Magika()
1✔
156
    res = m.identify_path(Path(filepath))
1✔
157
    return res.output.mime_type
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc