• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

4dn-dcic / utils / 6736812910

02 Nov 2023 06:32PM UTC coverage: 78.593% (+0.008%) from 78.585%
6736812910

push

github

web-flow
Merge pull request #292 from 4dn-dcic/smaht-ingestion-related

Misc changes related to SMaHT ingestion.

24 of 24 new or added lines in 3 files covered. (100.0%)

9340 of 11884 relevant lines covered (78.59%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.77
/dcicutils/bundle_utils.py
1
import copy
1✔
2

3
from typing import Any, Dict, List, Optional, Tuple, Union
1✔
4
from .common import AnyJsonData
1✔
5
from .env_utils import EnvUtils, public_env_name
1✔
6
from .ff_utils import get_metadata
1✔
7
from .misc_utils import AbstractVirtualApp, ignored, ignorable, PRINT, to_camel_case
1✔
8
from .sheet_utils import (
1✔
9
    LoadTableError, prefer_number, TabbedJsonSchemas,
10
    Header, Headers, TabbedHeaders, ParsedHeader, ParsedHeaders, TabbedParsedHeaders, SheetCellValue, TabbedSheetData,
11
    TableSetManagerRegistry, AbstractTableSetManager, InsertsManager, TableSetManager, load_table_set,
12
)
13
from .validation_utils import SchemaManager, validate_data_against_schemas
1✔
14

15

16
PatchPrototype = Dict
1✔
17
TabbedPatchPrototypes = Dict[str, PatchPrototype]
1✔
18

19

20
class TypeHintContext:
1✔
21

22
    @classmethod
1✔
23
    def schema_exists(cls, schema_name: str) -> bool:  # noQA - PyCharm complains wrongly about return value
1✔
24
        ignored(schema_name)
×
25
        raise NotImplementedError(f"{cls.__name__}.schema_exists(...) is not implemented.")
×
26

27
    @classmethod
1✔
28
    def validate_ref(cls, item_type: str, item_ref: str) -> str:  # noQA - PyCharm complains wrongly about return value
1✔
29
        ignored(item_type, item_ref)
×
30
        raise NotImplementedError(f"{cls.__name__}.validate_ref(...) is not implemented.")
×
31

32
    @classmethod
1✔
33
    def note_problem(cls, problem: str):
1✔
34
        ignored(problem)
×
35
        raise NotImplementedError(f"{cls.__name__}.note_problem(...) is not implemented.")
×
36

37
    def __str__(self):
1✔
38
        return f"<{self.__class__.__name__} {id(self)}>"
×
39

40

41
class ValidationProblem(Exception):
1✔
42
    def __init__(self, problems: Optional[dict] = None):
1✔
43
        self.problems = problems
1✔
44

45

46
class TypeHint:
1✔
47
    def apply_hint(self, value):
1✔
48
        return value
1✔
49

50
    def __str__(self):
1✔
51
        return f"<{self.__class__.__name__}>"
×
52

53
    def __repr__(self):
1✔
54
        return self.__str__()
×
55

56

57
class BoolHint(TypeHint):
1✔
58

59
    # We could use other ways to do this, such as initial substring, but this is more likely to be right.
60
    # Then again, we might want to consder accepting athers like 'yes/no', 'y/n', 'on/off', '1/0'.
61
    TRUE_VALUES = ['true', 't']
1✔
62
    FALSE_VALUES = ['false', 'f']
1✔
63

64
    def apply_hint(self, value):
1✔
65
        if isinstance(value, str) and value:
1✔
66
            l_value = value.lower()
1✔
67
            if l_value in self.TRUE_VALUES:
1✔
68
                return True
1✔
69
            elif l_value in self.FALSE_VALUES:
1✔
70
                return False
1✔
71
        return super().apply_hint(value)
1✔
72

73

74
class NumHint(TypeHint):
1✔
75

76
    PREFERENCE_MAP = {'number': 'num', 'integer': 'int', 'float': 'float'}
1✔
77

78
    def __init__(self, declared_type: Optional[str] = None):
1✔
79
        if declared_type is None:
1✔
80
            declared_type = 'num'
1✔
81
        self.preferred_type = self.PREFERENCE_MAP.get(declared_type)
1✔
82

83
    def apply_hint(self, value):
1✔
84
        if isinstance(value, str) and value:
×
85
            if self.preferred_type:
×
86
                return prefer_number(value, kind=self.preferred_type)
×
87
            else:
88
                return value
×
89
        return super().apply_hint(value)
×
90

91

92
class EnumHint(TypeHint):
1✔
93

94
    def __str__(self):
1✔
95
        return f"<EnumHint {','.join(f'{key}={val}' for key, val in self.value_map.items())}>"
×
96

97
    def __init__(self, value_map):
1✔
98
        self.value_map = value_map
1✔
99

100
    def apply_hint(self, value):
1✔
101
        if isinstance(value, str):
1✔
102
            if value in self.value_map:
1✔
103
                result = self.value_map[value]
1✔
104
                return result
1✔
105
            else:
106
                lvalue = value.lower()
1✔
107
                found = []
1✔
108
                for lkey, key in self.value_map.items():
1✔
109
                    if lkey.startswith(lvalue):
1✔
110
                        found.append(lkey)
1✔
111
                if len(found) == 1:
1✔
112
                    [only_found] = found
1✔
113
                    result = self.value_map[only_found]
1✔
114
                    return result
1✔
115
        return super().apply_hint(value)
1✔
116

117

118
class RefHint(TypeHint):
1✔
119

120
    def __str__(self):
1✔
121
        return f"<RefHint {self.schema_name} context={self.context}>"
×
122

123
    def __init__(self, schema_name: str, context: TypeHintContext):
1✔
124
        self.schema_name = schema_name
1✔
125
        self.context = context
1✔
126

127
    def apply_hint(self, value):
1✔
128
        if not self.context.validate_ref(item_type=self.schema_name, item_ref=value):
1✔
129
            raise ValidationProblem(f"Unable to validate {self.schema_name} reference: {value!r}")
1✔
130
        return value
1✔
131

132

133
class OptionalTypeHints:
1✔
134

135
    def __init__(self, positional_hints: Optional[List[Optional[TypeHint]]] = None,
1✔
136
                 positional_breadcrumbs: Optional[List[Union[List, Tuple]]] = None):
137
        self.other_hints: Dict[Any, TypeHint] = {}
1✔
138
        self.positional_hints: List[Optional[TypeHint]] = [] if positional_hints is None else positional_hints
1✔
139
        if positional_breadcrumbs and positional_hints:
1✔
140
            n = len(positional_breadcrumbs)
1✔
141
            if n != len(positional_hints):
1✔
142
                raise Exception("positional_hints and positional_breadcrumbs must have the same length.")
×
143
            for i in range(n):
1✔
144
                # for convenience, we accept this as a list or tuple, but it must be a tuple to be a key
145
                breadcrumbs = tuple(positional_breadcrumbs[i])
1✔
146
                if not isinstance(breadcrumbs, tuple):
1✔
147
                    raise Exception(f"Each of the positional breadcrumbs must be a tuple: {breadcrumbs}")
×
148
                hint = positional_hints[i]
1✔
149
                self.other_hints[breadcrumbs] = hint
1✔
150

151
    def __getitem__(self, key: Any) -> Optional[TypeHint]:
1✔
152
        """
153
        For enumerated positional information, we consult our initial type vector.
154
        For other situations, we do a general lookup of the hint in our lookup table.
155
        """
156
        if isinstance(key, int):
1✔
157
            hints = self.positional_hints
1✔
158
            if key < 0:
1✔
159
                raise ValueError(f"Negative hint positions are not allowed: {key}")
1✔
160
            elif key >= len(hints):
1✔
161
                return None
1✔
162
            else:
163
                return hints[key]
1✔
164
        elif isinstance(key, tuple):  # a parsed header (or schema breadcrumbs)
1✔
165
            return self.other_hints.get(key)
1✔
166
        else:
167
            raise ValueError(f"Key of unexpected type for OptionalTypeHints: {key}")
×
168

169
    def __setitem__(self, key: Any, value: TypeHint):
1✔
170
        if isinstance(key, int):
1✔
171
            raise ValueError(f"Cannot assign OptionalTypeHints by position after initial creation: {key!r}")
1✔
172
        elif key in self.other_hints:
1✔
173
            raise ValueError(f"Attempt to redefine OptionalTypeHint key {key!r}.")
1✔
174
        elif isinstance(key, tuple):
1✔
175
            self.other_hints[key] = value
1✔
176
        else:
177
            raise ValueError(f"Attempt to set an OptionalTypeHints key to other than a breadcrumbs tuple: {key!r}")
1✔
178

179

180
class AbstractStructureManager(AbstractTableSetManager):
1✔
181

182
    pass
1✔
183

184

185
class ItemTools:
1✔
186
    """
187
    Implements operations on table-related data without pre-supposing the specific representation of the table.
188
    It is assumed this can be used for data that was obtained from .json, .csv, .tsv, and .xlsx files because
189
    it does not presuppose the source of the data nor where it will be written to.
190

191
    For the purpose of this class:
192

193
    * a 'header' is a string representing the top of a column.
194

195
    * a 'parsed header' is a list of strings and/or ints, after splitting at uses of '#' or '.', so that
196
      "a.b.c" is represented as ["a", "b", "c"], and "x.y#0" is represented as ["x", "y", 0], and representing
197
      each numeric token as an int instead of a string.
198

199
    * a 'headers' object is just a list of strings, each of which is a 'header'.
200

201
    * a 'parsed headers' object is a non-empty list of lists, each of which is a 'parsed header'.
202
      e..g., the headers ["a.b.c", "x.y#0"] is represented as parsed hearders [["a", "b", "c"], ["x", "y", 0]].
203

204
   """
205

206
    @classmethod
1✔
207
    def parse_sheet_header(cls, header: Header) -> ParsedHeader:
1✔
208
        result = []
1✔
209
        token = ""
1✔
210
        for i in range(len(header)):
1✔
211
            ch = header[i]
1✔
212
            if ch == '.' or ch == '#':
1✔
213
                if token:
1✔
214
                    result.append(int(token) if token.isdigit() else token)
1✔
215
                    token = ""
1✔
216
            else:
217
                token += ch
1✔
218
        if token:
1✔
219
            result.append(int(token) if token.isdigit() else token)
1✔
220
        return result
1✔
221

222
    @classmethod
1✔
223
    def parse_sheet_headers(cls, headers: Headers) -> ParsedHeaders:
1✔
224
        return [cls.parse_sheet_header(header)
1✔
225
                for header in headers]
226

227
    @classmethod
1✔
228
    def compute_patch_prototype(cls, parsed_headers: ParsedHeaders):
1✔
229
        prototype = {}
1✔
230
        for parsed_header in parsed_headers:
1✔
231
            parsed_header0 = parsed_header[0]
1✔
232
            if isinstance(parsed_header0, int):
1✔
233
                raise LoadTableError(f"A header cannot begin with a numeric ref: {parsed_header0}")
1✔
234
            cls.assure_patch_prototype_shape(parent=prototype, keys=parsed_header)
1✔
235
        return prototype
1✔
236

237
    @classmethod
1✔
238
    def assure_patch_prototype_shape(cls, *, parent: Union[Dict, List], keys: ParsedHeader):
1✔
239
        [key0, *more_keys] = keys
1✔
240
        key1 = more_keys[0] if more_keys else None
1✔
241
        if isinstance(key1, int):
1✔
242
            placeholder = []
1✔
243
        elif isinstance(key1, str):
1✔
244
            placeholder = {}
1✔
245
        else:
246
            placeholder = None
1✔
247
        if isinstance(key0, int):
1✔
248
            n = len(parent)
1✔
249
            if key0 == n:
1✔
250
                parent.append(placeholder)
1✔
251
            elif key0 > n:
1✔
252
                raise LoadTableError("Numeric items must occur sequentially.")
×
253
        elif isinstance(key0, str):
1✔
254
            if key0 not in parent:
1✔
255
                parent[key0] = placeholder
1✔
256
        if key1 is not None:
1✔
257
            cls.assure_patch_prototype_shape(parent=parent[key0], keys=more_keys)
1✔
258
        return parent
1✔
259

260
    INSTAGUIDS_ENABLED = False  # Experimental feature not enabled by default
1✔
261

262
    @classmethod
1✔
263
    def parse_item_value(cls, value: SheetCellValue,
1✔
264
                         apply_heuristics: bool = False, split_pipe: bool = False) -> AnyJsonData:
265
        """
266
        Returns the item value unmodified, unless apply_heuristics=True is given,
267
        in which case heuristics ARE applied. This is intended to be used for spreadsheet
268
        values that look like non-strings and should perhaps be interepreted as such.
269

270
        This is a vestige of an older plan to have these things happen magically behind the scenes early in
271
        the process. Unfortunately, that was found to impede correct processing later, so now this is disabled
272
        by default. It may still be useful in some cases when dealing with data that has no schema, so the
273
        functionality is still here and must be explicitly requested.
274

275
        :param value: a value in a table (such as a spreadsheet)
276
        :param apply_heuristics: whether to apply heuristic coercions based on what the value looks like (default False)
277
        :param split_pipe: whether to apply the 'split pipe' heuristic, changing 'a|1' to ['a', 1], even if
278
           apply_heuristics=True was given (default False)
279
        """
280
        if not apply_heuristics:
1✔
281
            # In order to not interfere with schema-driven processing, we mostly default to
282
            # NOT applying heuristics. You have to ask for them explicitly if you want them.
283
            # -kmp 23-Oct-2023
284
            return value
1✔
285
        if isinstance(value, str):
1✔
286
            lvalue = value.lower()
1✔
287
            if lvalue == 'true':
1✔
288
                return True
1✔
289
            elif lvalue == 'false':
1✔
290
                return False
1✔
291
            elif lvalue == 'null' or lvalue == '':
1✔
292
                return None
1✔
293
            elif split_pipe and '|' in value:
1✔
294
                if value == '|':  # Use '|' for []
1✔
295
                    return []
1✔
296
                else:
297
                    if value.endswith("|"):  # Use 'foo|' for ['foo']
1✔
298
                        value = value[:-1]
1✔
299
                    return [cls.parse_item_value(subvalue, apply_heuristics=apply_heuristics, split_pipe=split_pipe)
1✔
300
                            for subvalue in value.split('|')]
301
            else:
302
                # Doug points out that the schema might not agree, might want a string representation of a number.
303
                # At this semantic layer, this might be a bad choice.
304
                return prefer_number(value)
1✔
305
        else:  # presumably a number (int or float)
306
            return value
1✔
307

308
    @classmethod
1✔
309
    def set_path_value(cls, datum: Union[List, Dict], path: ParsedHeader, value: Any, force: bool = False):
1✔
310
        if (value is None or value == '') and not force:
1✔
311
            return
1✔
312
        [key, *more_path] = path
1✔
313
        if not more_path:
1✔
314
            datum[key] = value
1✔
315
        else:
316
            cls.set_path_value(datum[key], more_path, value)
1✔
317

318
    @classmethod
1✔
319
    def find_type_hint_for_subschema(cls, subschema: Any, context: Optional[TypeHintContext] = None):
1✔
320
        if subschema is not None:
1✔
321
            t = subschema.get('type')
1✔
322
            if t == 'string':
1✔
323
                enum = subschema.get('enum')
1✔
324
                if enum:
1✔
325
                    mapping = {e.lower(): e for e in enum}
1✔
326
                    return EnumHint(mapping)
1✔
327
                link_to = subschema.get('linkTo')
1✔
328
                if link_to and context.schema_exists(link_to):
1✔
329
                    return RefHint(schema_name=link_to, context=context)
1✔
330
            elif t in ('integer', 'float', 'number'):
1✔
331
                return NumHint(declared_type=t)
×
332
            elif t == 'boolean':
1✔
333
                return BoolHint()
1✔
334

335
    @classmethod
1✔
336
    def find_type_hint_for_parsed_header(cls, parsed_header: Optional[ParsedHeader], schema: Any,
1✔
337
                                         context: Optional[TypeHintContext] = None):
338
        def finder(subheader, subschema):
1✔
339
            if not parsed_header:
1✔
340
                return None
1✔
341
            else:
342
                [key1, *other_headers] = subheader
1✔
343
                if isinstance(key1, str) and isinstance(subschema, dict):
1✔
344
                    if subschema.get('type') == 'object':
1✔
345
                        subsubschema = subschema.get('properties', {}).get(key1)
1✔
346
                        if not other_headers:
1✔
347
                            hint = cls.find_type_hint_for_subschema(subsubschema, context=context)
1✔
348
                            if hint:
1✔
349
                                return hint
1✔
350
                            else:
351
                                pass  # fall through to asking super()
×
352
                        else:
353
                            return finder(subheader=other_headers, subschema=subsubschema)
1✔
354

355
        return finder(subheader=parsed_header, subschema=schema)
1✔
356

357

358
ITEM_MANAGER_REGISTRY = TableSetManagerRegistry()
1✔
359

360

361
class InflatableTabbedDataManager:
1✔
362
    """
363
    This tool can be used independently of the item tools. It doesn't involve schemas, but it does allow the
364
    inflation of a table with dotted names to structures. e.g., a table with headers mother.name, mother.age,
365
    father.name, and father.age, as in
366
      data = load_table_set(<some-file>)
367
    to bring in the flat representation with:
368
      {"mother.name": <mother.name>, "mother.age": <mother.age>, ...}
369
    one can use inflate(data) to get:
370
      {"mother": {"name": <mother.name>, "age": <mother.age>},
371
       "father:  {"name": <father.name>, "age": <father.age>}}
372
    Note, too, that although data != inflate(data), once inflated, inflate(inflate(data)) == inflate(data).
373
    """
374

375
    def __init__(self, tabbed_sheet_data: TabbedSheetData, apply_heuristics: bool = False):
1✔
376
        self.tabbed_sheet_data: TabbedSheetData = tabbed_sheet_data
1✔
377
        self.apply_heuristics = apply_heuristics
1✔
378
        self.headers_by_tab_name: TabbedHeaders = InsertsManager.extract_tabbed_headers(tabbed_sheet_data)
1✔
379
        self.parsed_headers_by_tab_name: TabbedParsedHeaders = {
1✔
380
            tab_name: ItemTools.parse_sheet_headers(headers)
381
            for tab_name, headers in self.headers_by_tab_name.items()
382
        }
383
        self.patch_prototypes_by_tab_name: TabbedPatchPrototypes = {
1✔
384
            tab_name: ItemTools.compute_patch_prototype(parsed_headers)
385
            for tab_name, parsed_headers in self.parsed_headers_by_tab_name.items()
386
        }
387

388
    @property
1✔
389
    def tab_names(self):
1✔
390
        return list(self.tabbed_sheet_data.keys())
1✔
391

392
    def inflate_tabs(self):
1✔
393
        return {tab_name: self.inflate_tab(tab_name)
1✔
394
                for tab_name in self.tab_names}
395

396
    def inflate_tab(self, tab_name: str):
1✔
397
        prototype = self.patch_prototypes_by_tab_name[tab_name]
1✔
398
        parsed_headers = self.parsed_headers_by_tab_name[tab_name]
1✔
399
        result = [self.inflate_row(row, prototype=prototype, parsed_headers=parsed_headers)
1✔
400
                  for row in self.tabbed_sheet_data[tab_name]]
401
        return result
1✔
402

403
    def inflate_row(self, row: Dict, *, prototype: Dict, parsed_headers: ParsedHeaders):
1✔
404
        patch_item = copy.deepcopy(prototype)
1✔
405
        for column_number, column_value in enumerate(row.values()):
1✔
406
            parsed_value = ItemTools.parse_item_value(column_value, apply_heuristics=self.apply_heuristics)
1✔
407
            ItemTools.set_path_value(patch_item, parsed_headers[column_number], parsed_value)
1✔
408
        return patch_item
1✔
409

410
    @classmethod
1✔
411
    def inflate(cls, tabbed_sheet_data: TabbedSheetData, apply_heuristics: bool = False):
1✔
412
        inflater = cls(tabbed_sheet_data, apply_heuristics=apply_heuristics)
1✔
413
        inflated = inflater.inflate_tabs()
1✔
414
        return inflated
1✔
415

416

417
inflate = InflatableTabbedDataManager.inflate
1✔
418

419

420
def load_table_structures(filename: str, *, apply_heuristics: bool = True,
1✔
421
                          tab_name: Optional[str] = None, escaping: Optional[bool] = None, **kwargs):
422
    """This differs from load_table_set only in that it inflates the content. It does not apply schemas."""
423
    tabbed_rows = load_table_set(filename=filename, tab_name=tab_name, escaping=escaping, **kwargs)
1✔
424
    tabbed_structures = inflate(tabbed_rows, apply_heuristics=apply_heuristics)
1✔
425
    return tabbed_structures
1✔
426

427

428
class TableChecker(InflatableTabbedDataManager, TypeHintContext):
1✔
429

430
    def __init__(self, tabbed_sheet_data: TabbedSheetData, *, flattened: bool,
1✔
431
                 override_schemas: Optional[TabbedJsonSchemas] = None,
432
                 portal_env: Optional[str] = None, portal_vapp: Optional[AbstractVirtualApp] = None,
433
                 apply_heuristics: bool = False):
434

435
        self.flattened = flattened
1✔
436
        # if not flattened:
437
        #     # TODO: Need to implement something that depends on this flattened attribute.
438
        #     # Also, it's possible that we can default this once we see if the new strategy is general-purpose,
439
        #     # rather than it being a required argument. But for now let's require it be passed.
440
        #     # -kmp 25-Oct-2023
441
        #     raise ValueError("Only flattened=True is supported by TableChecker for now.")
442

443
        if portal_env is None and portal_vapp is None:
1✔
444
            portal_env = public_env_name(EnvUtils.PRD_ENV_NAME)
1✔
445
        # InflatableTabbedDataManager supplies:
446
        #   self.tabbed_sheet_data: TabbedSheetData =
447
        #   self.headers_by_tab_name: TabbedHeaders =
448
        #   self.parsed_headers_by_tab_name: TabbedParsedHeaders =
449
        #   self.patch_prototypes_by_tab_name: TabbedPatchPrototypes =
450
        self._problems: List[str] = []
1✔
451
        super().__init__(tabbed_sheet_data=tabbed_sheet_data, apply_heuristics=apply_heuristics)
1✔
452
        self.portal_env = portal_env
1✔
453
        self.portal_vapp = portal_vapp
1✔
454
        self.schema_manager: SchemaManager = SchemaManager(portal_env=portal_env, portal_vapp=portal_vapp,
1✔
455
                                                           override_schemas=override_schemas)
456
        self.schemas = self.schema_manager.fetch_relevant_schemas(self.tab_names)  # , schemas=schemas)
1✔
457
        self.lookup_tables_by_tab_name: Dict[str, Dict[str, Dict]] = {
1✔
458
            tab_name: self.build_lookup_table_for_tab(tab_name, rows=rows)
459
            for tab_name, rows in tabbed_sheet_data.items()
460
        }
461
        self.type_hints_by_tab_name: Dict[str, OptionalTypeHints] = {
1✔
462
            tab_name: self.compile_type_hints(tab_name)
463
            for tab_name in self.tab_names
464
        }
465

466
    def schema_for_tab(self, tab_name: str) -> dict:
1✔
467
        # Once our class is initialized, every tab should have a schema, even if just {}
468
        schema = self.schemas.get(tab_name)
×
469
        if schema is None:
×
470
            raise ValueError(f"No schema was given or fetched for tab {tab_name!r}.")
×
471
        return schema
×
472

473
    def note_problem(self, problem: str):
1✔
474
        self._problems.append(problem)
1✔
475

476
    def build_lookup_table_for_tab(self, tab_name: str, *, rows: List[Dict]) -> Dict[str, Dict]:
1✔
477
        identifying_properties = self.schema_manager.identifying_properties(schema_name=tab_name)
1✔
478
        if not identifying_properties:
1✔
479
            # Maybe issue a warning here that we're going to lose
480
            empty_lookup_table: Dict[str, Dict] = {}
×
481
            return empty_lookup_table
×
482
        lookup_table: Dict[str, Dict] = {}
1✔
483
        for row in rows:
1✔
484
            for identifying_property in identifying_properties:
1✔
485
                value = row.get(identifying_property)
1✔
486
                if value != '' and value is not None:
1✔
487
                    lookup_table[str(value)] = row
1✔
488
        return lookup_table
1✔
489

490
    def contains_ref(self, item_type, item_ref):
1✔
491
        ref = self.resolve_ref(item_type=item_type, item_ref=item_ref)
1✔
492
        if ref is None:
1✔
493
            return False
1✔
494
        else:
495
            return True
1✔
496

497
    def resolve_ref(self, item_type, item_ref):
1✔
498
        lookup_table = self.lookup_tables_by_tab_name.get(item_type)
1✔
499
        if lookup_table:  # Is it a type we're tracking?
1✔
500
            return lookup_table.get(item_ref) or None
1✔
501
        else:  # Apparently some stray type not in our tables
502
            return None
×
503

504
    def raise_any_pending_problems(self):
1✔
505
        problems = self._problems
1✔
506
        if problems:
1✔
507
            for problem in problems:
1✔
508
                PRINT(f"Problem: {problem}")
1✔
509
            raise ValidationProblem(problems)
1✔
510
            # raise Exception(there_are(problems, kind='problem while compiling hints', tense='past', show=False))
511

512
    def check_tabs(self):
1✔
513
        result = {tab_name: self.check_tab(tab_name)
1✔
514
                  for tab_name in self.tab_names}
515
        # At this point, doing the checking will have already raised certain errors, if those errors interfere
516
        # with continued checking, but some smaller problems may have been deferred until the end, so we have to
517
        # check for and raise an error for any such pending problems now.
518
        self.raise_any_pending_problems()
1✔
519
        return result
1✔
520

521
    def validate_ref(self, item_type, item_ref):
1✔
522
        if self.contains_ref(item_type=item_type, item_ref=item_ref):
1✔
523
            return True
1✔
524
        try:
1✔
525
            # TODO: This probably needs a cache
526
            info = get_metadata(f"/{to_camel_case(item_type)}/{item_ref}",
1✔
527
                                ff_env=self.portal_env, vapp=self.portal_vapp)
528
            # Basically return True if there's a value at all,
529
            # but still check it's not an error message that didn't get raised.
530
            return isinstance(info, dict) and 'uuid' in info
×
531
        except Exception:
1✔
532
            return False
1✔
533

534
    def schema_exists(self, schema_name: str) -> bool:
1✔
535
        return self.schema_manager.schema_exists(schema_name)
1✔
536

537
    def check_tab(self, tab_name: str):
1✔
538
        prototype = self.patch_prototypes_by_tab_name[tab_name]
1✔
539
        parsed_headers = self.parsed_headers_by_tab_name[tab_name]
1✔
540
        type_hints = self.type_hints_by_tab_name[tab_name]
1✔
541
        result = [self.check_row(row, tab_name=tab_name, row_number=row_number, prototype=prototype,
1✔
542
                                 parsed_headers=parsed_headers, type_hints=type_hints)
543
                  for row_number, row in enumerate(self.tabbed_sheet_data[tab_name])]
544
        return result
1✔
545

546
    def check_row(self, row: Dict, *, tab_name: str, row_number: int, prototype: Dict,
1✔
547
                  parsed_headers: ParsedHeaders, type_hints: OptionalTypeHints):
548
        if self.flattened:
1✔
549
            return self.check_flattened_row(row=row, tab_name=tab_name, row_number=row_number, prototype=prototype,
1✔
550
                                            parsed_headers=parsed_headers, type_hints=type_hints)
551
        else:
552
            return self.check_inflated_row(row=row, tab_name=tab_name, row_number=row_number, prototype=prototype,
×
553
                                           parsed_headers=parsed_headers, type_hints=type_hints)
554

555
    def check_inflated_row(self, row: Dict, *, tab_name: str, row_number: int, prototype: Dict,
1✔
556
                           parsed_headers: ParsedHeaders, type_hints: OptionalTypeHints):
557
        ignorable(self, tab_name, row_number, prototype, parsed_headers, type_hints)  #
×
558
        # TODO: Make this work...
559
        # def traverse(item, *, subschema, breadcrumbs):
560
        #     if isinstance(item, list):
561
        #         # check schema here to make sure it's supposed to be a list before proceeding
562
        #         for i, elem in enumerate(item):
563
        #             traverse(item, subschema=..., breadcrumbs=(*breadcrumbs, i))
564
        #     elif isinstance(item, dict):
565
        #         # check schema here to make sure it's supposed to be a dict before proceeding
566
        #         for k, v in item.items():
567
        #             traverse(v, subschema=..., breadcrumbs=(*breadcrumbs, k))
568
        #     else:
569
        #         # look up hint. if there's not a hint for these breadcrumbs, make one
570
        #         # apply the hint for side-effect, to get an error if we have a bad value
571
        #         pass
572
        # schema = self.schemas[tab_name]
573
        # if schema:
574
        #     traverse(row, subschema=schema, breadcrumbs=())  # for side-effect
575
        return row
×
576

577
    def check_flattened_row(self, row: Dict, *, tab_name: str, row_number: int, prototype: Dict,
1✔
578
                            parsed_headers: ParsedHeaders, type_hints: OptionalTypeHints):
579
        patch_item = copy.deepcopy(prototype)
1✔
580
        for column_number, column_value in enumerate(row.values()):
1✔
581
            parsed_value = ItemTools.parse_item_value(column_value, apply_heuristics=self.apply_heuristics)
1✔
582
            type_hint = type_hints[column_number]
1✔
583
            if type_hint:
1✔
584
                try:
1✔
585
                    parsed_value = type_hint.apply_hint(parsed_value)
1✔
586
                except ValidationProblem as e:
1✔
587
                    headers = self.headers_by_tab_name[tab_name]
1✔
588
                    column_name = headers[column_number]
1✔
589
                    self.note_problem(f"{tab_name}[{row_number}].{column_name}: {e}")
1✔
590
            ItemTools.set_path_value(patch_item, parsed_headers[column_number], parsed_value)
1✔
591
        return patch_item
1✔
592

593
    @classmethod
1✔
594
    def check(cls, tabbed_sheet_data: TabbedSheetData, *,
1✔
595
              flattened: bool,
596
              override_schemas: Optional[TabbedJsonSchemas] = None,
597
              apply_heuristics: bool = False,
598
              portal_env: Optional[str] = None, portal_vapp: Optional[AbstractVirtualApp] = None):
599
        checker = cls(tabbed_sheet_data, flattened=flattened,
1✔
600
                      override_schemas=override_schemas, apply_heuristics=apply_heuristics,
601
                      portal_env=portal_env, portal_vapp=portal_vapp)
602
        checked = checker.check_tabs()
1✔
603
        return checked
1✔
604

605
    class SheetState:
1✔
606

607
        def __init__(self, parsed_headers: ParsedHeaders, type_hints: OptionalTypeHints):
1✔
608
            self.parsed_headers = parsed_headers
×
609
            self.type_hints = type_hints
×
610

611
    def compile_type_hints(self, tab_name: str) -> OptionalTypeHints:
1✔
612
        parsed_headers = self.parsed_headers_by_tab_name[tab_name]
1✔
613
        schema = self.schemas.get(tab_name)
1✔
614
        for required_header in self._schema_required_headers(schema):
1✔
615
            if required_header not in parsed_headers:
×
616
                self.note_problem("Missing required header")
×
617
        positional_type_hints = [(ItemTools.find_type_hint_for_parsed_header(parsed_header, schema, context=self)
1✔
618
                                  if schema
619
                                  else None)
620
                                 for parsed_header in parsed_headers]
621
        type_hints = OptionalTypeHints(positional_type_hints, positional_breadcrumbs=parsed_headers)
1✔
622
        return type_hints
1✔
623

624
    @classmethod
1✔
625
    def _schema_required_headers(cls, schema):
1✔
626
        ignored(schema)
1✔
627
        return []  # TODO: Make this compute a list of required headers (in parsed header form)
1✔
628

629
    def create_tab_processor_state(self, tab_name: str) -> SheetState:
1✔
630
        # This will create state that allows us to efficiently assign values in the right place on each row
631
        return self.SheetState(parsed_headers=self.parsed_headers_by_tab_name[tab_name],
×
632
                               type_hints=self.type_hints_by_tab_name[tab_name])
633

634

635
check = TableChecker.check
1✔
636

637

638
def load_items(filename: str, tab_name: Optional[str] = None, escaping: Optional[bool] = None,
1✔
639
               override_schemas: Optional[TabbedJsonSchemas] = None, apply_heuristics: bool = False,
640
               portal_env: Optional[str] = None, portal_vapp: Optional[AbstractVirtualApp] = None,
641
               # TODO: validate= is presently False (i.e., disabled) by default while being debugged,
642
               #       but for production use maybe should not be? -kmp 25-Oct-2023
643
               validate: bool = False,
644
               **kwargs):
645
    annotated_data = TableSetManager.load_annotated(filename=filename, tab_name=tab_name, escaping=escaping,
1✔
646
                                                    prefer_number=False, **kwargs)
647
    tabbed_rows = annotated_data['content']
1✔
648
    flattened = annotated_data['flattened']
1✔
649
    if flattened:
1✔
650
        checked_items = TableChecker.check(tabbed_rows, flattened=flattened,
1✔
651
                                           override_schemas=override_schemas,
652
                                           portal_env=portal_env, portal_vapp=portal_vapp,
653
                                           apply_heuristics=apply_heuristics)
654
    else:
655
        # No fancy checking for things like .json, etc. for now. Only check things that came from
656
        # spreadsheet-like data, where structural datatypes are forced into strings.
657
        checked_items = tabbed_rows
1✔
658
    if validate:
1✔
659
        problems = validate_data_against_schemas(checked_items, portal_env=portal_env, portal_vapp=portal_vapp,
×
660
                                                 override_schemas=override_schemas)
661
        return checked_items, problems
×
662
    return checked_items
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc