• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

4dn-dcic / utils / 6736812910

02 Nov 2023 06:32PM UTC coverage: 78.593% (+0.008%) from 78.585%
6736812910

push

github

web-flow
Merge pull request #292 from 4dn-dcic/smaht-ingestion-related

Misc changes related to SMaHT ingestion.

24 of 24 new or added lines in 3 files covered. (100.0%)

9340 of 11884 relevant lines covered (78.59%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/dcicutils/validation_utils.py
1
import contextlib
1✔
2
import json
1✔
3
import jsonschema
1✔
4
import re
1✔
5

6
from typing import Dict, List, Optional
1✔
7
from .common import AnyJsonData
1✔
8
from .ff_utils import get_schema
1✔
9
from .env_utils import EnvUtils, public_env_name
1✔
10
from .lang_utils import there_are, maybe_pluralize, disjoined_list
1✔
11
from .misc_utils import AbstractVirtualApp, PRINT, to_snake_case
1✔
12
from .sheet_utils import JsonSchema, TabbedJsonSchemas, SheetData, TabbedSheetData
1✔
13
from .task_utils import pmap
1✔
14

15

16
class SchemaManager:
1✔
17

18
    @classmethod
1✔
19
    @contextlib.contextmanager
1✔
20
    def fresh_schema_manager_context_for_testing(cls):
1✔
21
        # TODO: Remove references to this once reimplementation using an instance variable for SCHEMA_CACHE is working.
22
        yield
1✔
23
        # old_schema_cache = cls.SCHEMA_CACHE
24
        # try:
25
        #     cls.SCHEMA_CACHE = {}
26
        #     yield
27
        # finally:
28
        #     cls.SCHEMA_CACHE = old_schema_cache
29

30
    def __init__(self, *, override_schemas: Optional[TabbedJsonSchemas] = None,
1✔
31
                 portal_env: Optional[str] = None, portal_vapp: Optional[AbstractVirtualApp] = None):
32
        self.SCHEMA_CACHE = {}  # Shared cache. Do not override. Use .clear_schema_cache() to clear it.
1✔
33
        if portal_env is None and portal_vapp is None:
1✔
34
            portal_env = public_env_name(EnvUtils.PRD_ENV_NAME)
1✔
35
            PRINT(f"The portal_env was not explicitly supplied. Schemas will come from portal_env={portal_env!r}.")
1✔
36
        self.portal_env = portal_env
1✔
37
        self.portal_vapp = portal_vapp
1✔
38
        self.override_schemas = (
1✔
39
            {}
40
            if override_schemas is None
41
            else {to_snake_case(key): value  # important to both canonicalize the case and copy the dict
42
                  for key, value in override_schemas.items()}
43
        )
44

45
    def fetch_relevant_schemas(self, schema_names: List[str]):  # , schemas: Optional[TabbedSchemas] = None):
1✔
46
        # if schemas is None:
47
        #     schemas = self.schemas
48
        # The schema_names argument is not normally given, but it is there for easier testing
49
        def name_and_schema(schema_name):
1✔
50
            # cached_schema = self.schemas.get(schema_name)  # schemas.get(schema_name)
51
            # schema = self.fetch_schema(schema_name) if cached_schema is None else cached_schema
52
            return schema_name, self.fetch_schema(schema_name)
1✔
53
        return {schema_name: schema
1✔
54
                for schema_name, schema in pmap(name_and_schema, schema_names)}
55

56
    def schema_exists(self, schema_name: str):
1✔
57
        return bool(self.fetch_schema(schema_name=schema_name))
1✔
58

59
    def fetch_schema(self, schema_name: str):
1✔
60
        schema_name = to_snake_case(schema_name)
1✔
61
        override_schema = self.override_schemas.get(schema_name)
1✔
62
        if override_schema is not None:
1✔
63
            return override_schema
1✔
64
        schema: Optional[AnyJsonData] = self.SCHEMA_CACHE.get(schema_name)
1✔
65
        if schema is None and schema_name not in self.SCHEMA_CACHE:  # If None is already stored, don't look it up again
1✔
66
            schema = get_schema(schema_name, portal_env=self.portal_env, portal_vapp=self.portal_vapp)
1✔
67
            self.SCHEMA_CACHE[schema_name] = schema
1✔
68
        return schema
1✔
69

70
    # Should not be needed, given that SCHEMA_CACHE is an instance variable.
71
    #
72
    # @classmethod
73
    # def clear_schema_cache(cls):
74
    #     for key in list(cls.SCHEMA_CACHE.keys()):  # important to get the list of keys as a separate object first
75
    #         cls.SCHEMA_CACHE.pop(key, None)
76

77
    def identifying_properties(self, schema: Optional[JsonSchema] = None, schema_name: Optional[str] = None,
1✔
78
                               among: Optional[List[str]] = None):
79
        schema = schema if schema is not None else self.fetch_schema(schema_name)
1✔
80
        possible_identifying_properties = set(self.get_identifying_properties(schema)) | {'uuid'}
1✔
81
        identifying_properties = sorted(possible_identifying_properties
1✔
82
                                        if among is None
83
                                        else (prop
84
                                              for prop in among
85
                                              if prop in possible_identifying_properties))
86
        return identifying_properties
1✔
87

88
    @classmethod  # This operation doesn't actually use the schemas so is safe as a class method
1✔
89
    def identifying_value(cls, data_item: Dict[str, AnyJsonData], identifying_properties) -> AnyJsonData:
1✔
90
        if not identifying_properties:
1✔
91
            raise ValueError("No identifying properties were specified.")
1✔
92
        for identifying_property in identifying_properties:
1✔
93
            if identifying_property in data_item:
1✔
94
                return data_item[identifying_property]
1✔
95
        raise ValueError(f'{there_are(identifying_properties, just_are=True)}'
1✔
96
                         f' no {maybe_pluralize(identifying_properties, "identifying property")}'
97
                         f' {disjoined_list([repr(x) for x in identifying_properties])}'
98
                         f' in {json.dumps(data_item)}.')
99

100
    @staticmethod
1✔
101
    def get_identifying_properties(schema: dict) -> list:
1✔
102
        if not schema:
1✔
103
            return []
1✔
104
        identifying_properties = schema.get("identifyingProperties", [])
1✔
105
        # Implicitly add "identifier" to "identifyingProperties", if it exists.
106
        if "identifier" not in identifying_properties and "identifier" in schema.get("properties", {}):
1✔
107
            identifying_properties.append("identifier")
×
108
        return identifying_properties
1✔
109

110

111
def validate_data_against_schemas(data: TabbedSheetData, *,
1✔
112
                                  portal_env: Optional[str] = None,
113
                                  portal_vapp: Optional[AbstractVirtualApp] = None,
114
                                  override_schemas: Optional[TabbedJsonSchemas] = None) -> Optional[Dict]:
115
    """
116
    Validates the given data against the corresponding schema(s). The given data is assumed to
117
    be in a format as returned by sheet_utils, i.e. a dictionary of lists of objects where each
118
    top-level dictionary property is the name of a data type for the contained list of objects.
119
    If no schemas are passed then they will be fetched from the Portal using the given portal_vapp
120
    to access them; the schemas are in a form similar to the data - a dictionary of schema objects,
121
    where each top-level dictionary property is the name of the data type for the contained schema.
122
    These data types are (strings) assumed to be in snake-case form, e.g. "file_submitted".
123

124
    If there are any absent required properties, any extraneous properties, or any undentified
125
    items in the data, then returns a dictionary with an itemized description of each of these errors,
126
    otherwise returns None if there are no problems. Note that an unidentified item is one which has
127
    no value for uuid nor any of the other identifying property values as defined by the schema.
128

129
    For example given data that looks something like this:
130
        {
131
            "file_format": [
132
                <object-for-this-type>,
133
                <another-object-for-this-type>,
134
                <et-cetera>
135
            ],
136
            "file_submitted": [
137
                <object-for-this-type>,
138
                <another-object-for-this-type>,
139
                <et-cetera>
140
            ]
141
        }
142

143
    This function might return someting like this (assuming these errors existed):
144
        {
145
            "errors": [
146
                {   "type": "file_format",
147
                    "unidentified": true,
148
                    "index": 2
149
                    "identifying_properties": [ "uuid", "file_format" ]
150
                },
151
                {   "type": "file_format",
152
                    "item": "vcf_gz",
153
                    "index": 1
154
                    "missing_properties": [ "standard_file_format" ]
155
                },
156
                {   "type": "file_submitted",
157
                    "item": "ebcfa32f-8eea-4591-a784-449fa5cd9ae9",
158
                    "index": 3
159
                    "extraneous_properties": [ "xyzzy", "foobar" ]
160
                }
161
                {   "error": "No schema found for: some_undefined_type"
162
                }
163
            ]
164
        }
165

166
    The "item" is the identifying value for the specified object (uuid or another defined by the schema).
167
    The "index" is the (0-indexed) ordinal position of the object within the list within the type within
168
    the given data, which can be useful in identifying the object in the source data if it is unidentified.
169
    """
170

171
    schema_manager = SchemaManager(portal_env=portal_env, portal_vapp=portal_vapp, override_schemas=override_schemas)
1✔
172

173
    errors = []
1✔
174
    schemas = schema_manager.fetch_relevant_schemas(list(data.keys()))
1✔
175

176
    for data_type in data:
1✔
177
        schema = schemas.get(data_type)
1✔
178
        if not schema:
1✔
179
            if schema is None:  # if Schema is {}, we're deliberately suppressing schema checking (not an error)
×
180
                errors.append({"error": f"No schema found for: {data_type}"})
×
181
            continue
×
182
        data_errors = validate_data_items_against_schemas(data[data_type], data_type, schema)
1✔
183
        errors.extend(data_errors)
1✔
184
    return {"errors": errors} if errors else None
1✔
185

186

187
def validate_data_items_against_schemas(data_items: SheetData, data_type: str, schema: JsonSchema) -> List[Dict]:
1✔
188
    """
189
    Like validate_data_against_schemas but for a simple list of data items each of the same given data type.
190
    """
191
    errors = []
1✔
192
    for data_item_index, data_item in enumerate(data_items):
1✔
193
        data_item_errors = validate_data_item_against_schemas(data_item, data_type, data_item_index, schema)
1✔
194
        errors.extend(data_item_errors)
1✔
195
    return errors
1✔
196

197

198
def validate_data_item_against_schemas(data_item: AnyJsonData, data_type: str,
1✔
199
                                       data_item_index: Optional[int], schema: JsonSchema) -> List[Dict]:
200
    """
201
    Like validate_data_against_schemas but for a single data item of the given data type.
202
    The given data item index is just for informational purposes; it corresponds to the
203
    ordinal index of the data item in its containing list. Uses the standard jsonschema
204
    package to do the heavy lifting of actual schema validation, but exerts extra effort to
205
    specifically itemize/aggregate the most common (missing and extraneous properties) errors.
206
    """
207
    errors = []
1✔
208

209
    identifying_properties = SchemaManager.get_identifying_properties(schema)
1✔
210
    identifying_value = SchemaManager.identifying_value(data_item, identifying_properties)
1✔
211
    if not identifying_value:
1✔
212
        errors.append({
×
213
            "type": data_type,
214
            "unidentified": True,
215
            "index": data_item_index,
216
            "identifying_properties": identifying_properties
217
        })
218

219
    def extract_single_quoted_strings(message: str) -> List[str]:
1✔
220
        return re.findall(r"'(.*?)'", message)
1✔
221

222
    schema_validator = jsonschema.Draft7Validator(schema)
1✔
223
    for schema_validation_error in schema_validator.iter_errors(data_item):
1✔
224
        if schema_validation_error.validator == "required":
1✔
225
            errors.append({
×
226
                "type": data_type,
227
                "item" if identifying_value else "unidentified": identifying_value if identifying_value else True,
228
                "index": data_item_index,
229
                "missing_properties": schema_validation_error.validator_value})
230
            continue
×
231
        if schema_validation_error.validator == "additionalProperties":
1✔
232
            properties = extract_single_quoted_strings(schema_validation_error.message)
1✔
233
            if properties:
1✔
234
                errors.append({
1✔
235
                    "type": data_type,
236
                    "item" if identifying_value else "unidentified": identifying_value if identifying_value else True,
237
                    "index": data_item_index,
238
                    "extraneous_properties": properties})
239
                continue
1✔
240
        errors.append({
×
241
            "type": data_type,
242
            "item" if identifying_value else "unidentified": identifying_value if identifying_value else True,
243
            "index": data_item_index,
244
            "unclassified_error": schema_validation_error.message})
245

246
    return errors
1✔
247

248

249
def summary_of_data_validation_errors(data_validation_errors: Dict,
1✔
250
                                      # These next three items are available from a portal's SubmissionFolio
251
                                      data_file_name: Optional[str] = None,
252
                                      s3_data_file_location: Optional[str] = None,
253
                                      s3_details_location: Optional[str] = None) -> List[str]:
254
    """
255
    Summarize the given data validation errors into a simple short list of English phrases;
256
    this will end up going into the additional_properties of the IngestionSubmission object
257
    in the Portal database (see SubmissionFolio.record_results); this is what will get
258
    displayed, if any errors, by the submitr tool when it detects processing has completed.
259
    """
260
    errors = data_validation_errors.get("errors")
1✔
261
    if not errors:
1✔
262
        return []
×
263

264
    unidentified_count = 0
1✔
265
    missing_properties_count = 0
1✔
266
    extraneous_properties_count = 0
1✔
267
    unclassified_error_count = 0
1✔
268
    exception_count = 0
1✔
269

270
    for error in errors:
1✔
271
        if error.get("unidentified"):
1✔
272
            unidentified_count += 1
×
273
        if error.get("missing_properties"):
1✔
274
            missing_properties_count += 1
×
275
        if error.get("extraneous_properties"):
1✔
276
            extraneous_properties_count += 1
1✔
277
        if error.get("unclassified_error"):
1✔
278
            unclassified_error_count += 1
×
279
        if error.get("exception"):
1✔
280
            exception_count += 1
×
281

282
    result = [
1✔
283
        f"Ingestion data validation error summary:"
284
    ]
285
    if data_file_name:
1✔
286
        result.append(f"Data file: {data_file_name}")
1✔
287
    if s3_data_file_location:
1✔
288
        result.append(f"Data file in S3: {s3_data_file_location}")
1✔
289
    result = result + [
1✔
290
        f"Items unidentified: {unidentified_count}",
291
        f"Items missing properties: {missing_properties_count}",
292
        f"Items with extraneous properties: {extraneous_properties_count}",
293
        f"Other errors: {unclassified_error_count}",
294
        f"Exceptions: {exception_count}",
295
    ]
296
    if s3_details_location:
1✔
297
        result.append(f"Details: {s3_details_location}")
1✔
298

299
    return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc