• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bogdandm / json2python-models / 11497710154

24 Oct 2024 10:40AM UTC coverage: 90.904% (-7.2%) from 98.072%
11497710154

Pull #59

github

bkalashnikov
Enable pytest workers
Pull Request #59: Modernize project setup and setup cron action job

2 of 6 new or added lines in 2 files covered. (33.33%)

115 existing lines in 7 files now uncovered.

1479 of 1627 relevant lines covered (90.9%)

4.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.33
/json_to_models/cli.py
1
import argparse
5✔
2
import configparser
5✔
3
import importlib
5✔
4
import itertools
5✔
5
import json
5✔
6
import os.path
5✔
7
import re
5✔
8
import sys
5✔
9
from collections import defaultdict
5✔
10
from datetime import datetime
5✔
11
from pathlib import Path
5✔
12
from typing import Any, Callable, Dict, Generator, Iterable, List, Tuple, Type, Union
5✔
13

14
from .models.sqlmodel import SqlModelCodeGenerator
5✔
15

16
try:
5✔
17
    import ruamel.yaml as yaml
5✔
18

19
    yaml_load = yaml.YAML(typ='safe', pure=True).load
5✔
20
except ImportError:
×
21
    try:
×
22
        import yaml
×
NEW
23
        yaml_load = yaml.safe_load
×
24
    except ImportError:
×
25
        yaml = None
×
26

27
from . import __version__ as VERSION
5✔
28
from .dynamic_typing import ModelMeta, register_datetime_classes, registry
5✔
29
from .generator import MetadataGenerator
5✔
30
from .models import ModelsStructureType
5✔
31
from .models.attr import AttrsModelCodeGenerator
5✔
32
from .models.base import GenericModelCodeGenerator, generate_code
5✔
33
from .models.dataclasses import DataclassModelCodeGenerator
5✔
34
from .models.pydantic import PydanticModelCodeGenerator
5✔
35
from .models.structure import compose_models, compose_models_flat
5✔
36
from .registry import (
5✔
37
    ModelCmp, ModelFieldsEquals, ModelFieldsNumberMatch, ModelFieldsPercentMatch, ModelRegistry
38
)
39
from .utils import convert_args
5✔
40

41
STRUCTURE_FN_TYPE = Callable[[Dict[str, ModelMeta]], ModelsStructureType]
5✔
42
bool_js_style = lambda s: {"true": True, "false": False}.get(s, None)
5✔
43

44

45
class Cli:
5✔
46
    MODEL_CMP_MAPPING = {
5✔
47
        "percent": convert_args(ModelFieldsPercentMatch, lambda s: float(s) / 100),
48
        "number": convert_args(ModelFieldsNumberMatch, int),
49
        "exact": ModelFieldsEquals
50
    }
51

52
    STRUCTURE_FN_MAPPING: Dict[str, STRUCTURE_FN_TYPE] = {
5✔
53
        "nested": compose_models,
54
        "flat": compose_models_flat
55
    }
56

57
    MODEL_GENERATOR_MAPPING: Dict[str, Type[GenericModelCodeGenerator]] = {
5✔
58
        "base": convert_args(GenericModelCodeGenerator),
59
        "attrs": convert_args(AttrsModelCodeGenerator, meta=bool_js_style),
60
        "dataclasses": convert_args(DataclassModelCodeGenerator, meta=bool_js_style,
61
                                    post_init_converters=bool_js_style),
62
        "pydantic": convert_args(PydanticModelCodeGenerator),
63
        "sqlmodel": convert_args(SqlModelCodeGenerator),
64
    }
65

66
    def __init__(self):
5✔
67
        self.initialized = False
5✔
68
        self.models_data: Dict[str, Iterable[dict]] = {}  # -m/-l
5✔
69
        self.enable_datetime: bool = False  # --datetime
5✔
70
        self.strings_converters: bool = False  # --strings-converters
5✔
71
        self.max_literals: int = -1  # --max-strings-literals
5✔
72
        self.merge_policy: List[ModelCmp] = []  # --merge
5✔
73
        self.structure_fn: STRUCTURE_FN_TYPE = None  # -s
5✔
74
        self.model_generator: Type[GenericModelCodeGenerator] = None  # -f & --code-generator
5✔
75
        self.model_generator_kwargs: Dict[str, Any] = None
5✔
76

77
        self.argparser = self._create_argparser()
5✔
78

79
    def parse_args(self, args: List[str] = None):
5✔
80
        """
81
        Parse list of command list arguments
82

83
        :param args: (Optional) List of arguments
84
        :return: None
85
        """
UNCOV
86
        parser = self.argparser
×
UNCOV
87
        namespace = parser.parse_args(args)
×
88

89
        # Extract args
UNCOV
90
        parser = getattr(FileLoaders, namespace.input_format)
×
UNCOV
91
        self.output_file = namespace.output
×
UNCOV
92
        self.enable_datetime = namespace.datetime
×
UNCOV
93
        disable_unicode_conversion = namespace.disable_unicode_conversion
×
UNCOV
94
        self.strings_converters = namespace.strings_converters
×
UNCOV
95
        self.max_literals = namespace.max_strings_literals
×
UNCOV
96
        merge_policy = [m.split("_") if "_" in m else m for m in namespace.merge]
×
UNCOV
97
        structure = namespace.structure
×
UNCOV
98
        framework = namespace.framework
×
UNCOV
99
        code_generator = namespace.code_generator
×
UNCOV
100
        code_generator_kwargs_raw: List[str] = namespace.code_generator_kwargs
×
UNCOV
101
        dict_keys_regex: List[str] = namespace.dict_keys_regex
×
UNCOV
102
        dict_keys_fields: List[str] = namespace.dict_keys_fields
×
UNCOV
103
        preamble: str = namespace.preamble
×
104

UNCOV
105
        for name in namespace.disable_str_serializable_types:
×
UNCOV
106
            registry.remove_by_name(name)
×
107

UNCOV
108
        self.setup_models_data(namespace.model or (), namespace.list or (), parser)
×
UNCOV
109
        self.validate(merge_policy, framework, code_generator)
×
UNCOV
110
        self.set_args(merge_policy, structure, framework, code_generator, code_generator_kwargs_raw,
×
111
                      dict_keys_regex, dict_keys_fields, disable_unicode_conversion, preamble)
112

113
    def run(self):
5✔
UNCOV
114
        if self.enable_datetime:
×
UNCOV
115
            register_datetime_classes()
×
UNCOV
116
        generator = MetadataGenerator(
×
117
            dict_keys_regex=self.dict_keys_regex,
118
            dict_keys_fields=self.dict_keys_fields
119
        )
UNCOV
120
        registry = ModelRegistry(*self.merge_policy)
×
UNCOV
121
        for name, data in self.models_data.items():
×
UNCOV
122
            meta = generator.generate(*data)
×
UNCOV
123
            registry.process_meta_data(meta, name)
×
UNCOV
124
        registry.merge_models(generator)
×
UNCOV
125
        registry.generate_names()
×
UNCOV
126
        structure = self.structure_fn(registry.models_map)
×
UNCOV
127
        output = self.version_string + generate_code(
×
128
            structure,
129
            self.model_generator,
130
            class_generator_kwargs=self.model_generator_kwargs,
131
            preamble=self.preamble
132
        )
UNCOV
133
        if self.output_file:
×
UNCOV
134
            with open(self.output_file, "w", encoding="utf-8") as f:
×
UNCOV
135
                f.write(output)
×
UNCOV
136
            return f"Output is written to {self.output_file}"
×
137
        else:
UNCOV
138
            return output
×
139

140
    @property
5✔
141
    def version_string(self):
5✔
UNCOV
142
        return (
×
143
            'r"""\n'
144
            f'generated by json2python-models v{VERSION} at {datetime.now().ctime()}\n'
145
            f'command: {" ".join(sys.argv)}\n'
146
            '"""\n'
147
        )
148

149
    def validate(self, merge_policy, framework, code_generator):
5✔
150
        """
151
        Validate parsed args
152

153
        :param merge_policy: List of merge policies. Each merge policy is either string or string and policy arguments
154
        :param framework: Framework name (predefined code generator)
155
        :param code_generator: Code generator import string
156
        :return:
157
        """
UNCOV
158
        for m in merge_policy:
×
UNCOV
159
            if isinstance(m, list):
×
UNCOV
160
                if m[0] not in self.MODEL_CMP_MAPPING:
×
UNCOV
161
                    raise ValueError(f"Invalid merge policy '{m[0]}', choices are {self.MODEL_CMP_MAPPING.keys()}")
×
UNCOV
162
            elif m not in self.MODEL_CMP_MAPPING:
×
UNCOV
163
                raise ValueError(f"Invalid merge policy '{m}', choices are {self.MODEL_CMP_MAPPING.keys()}")
×
164

UNCOV
165
        if framework == 'custom' and code_generator is None:
×
UNCOV
166
            raise ValueError("You should specify --code-generator to support custom generator")
×
UNCOV
167
        elif framework != 'custom' and code_generator is not None:
×
UNCOV
168
            raise ValueError("--code-generator argument has no effect without '--framework custom' argument")
×
169

170
    def setup_models_data(
5✔
171
            self,
172
            models: Iterable[Union[
173
                Tuple[str, str],
174
                Tuple[str, str, str],
175
            ]],
176
            models_lists: Iterable[Tuple[str, str, str]],
177
            parser: 'FileLoaders.T'
178
    ):
179
        """
180
        Initialize lazy loaders for models data
181
        """
UNCOV
182
        models_dict: Dict[str, List[dict]] = defaultdict(list)
×
183

UNCOV
184
        models = list(models) + list(models_lists)
×
UNCOV
185
        for model_tuple in models:
×
UNCOV
186
            if len(model_tuple) == 2:
×
UNCOV
187
                model_name, path_raw = model_tuple
×
UNCOV
188
                lookup = '-'
×
UNCOV
189
            elif len(model_tuple) == 3:
×
UNCOV
190
                model_name, lookup, path_raw = model_tuple
×
191
            else:
UNCOV
192
                raise RuntimeError('`--model` argument should contain exactly 2 or 3 strings')
×
193

UNCOV
194
            for real_path in process_path(path_raw):
×
UNCOV
195
                iterator = iter_json_file(parser(real_path), lookup)
×
UNCOV
196
                models_dict[model_name].extend(iterator)
×
197

UNCOV
198
        self.models_data = models_dict
×
199

200
    def set_args(
5✔
201
            self,
202
            merge_policy: List[Union[List[str], str]],
203
            structure: str,
204
            framework: str,
205
            code_generator: str,
206
            code_generator_kwargs_raw: List[str],
207
            dict_keys_regex: List[str],
208
            dict_keys_fields: List[str],
209
            disable_unicode_conversion: bool,
210
            preamble: str,
211
    ):
212
        """
213
        Convert CLI args to python representation and set them to appropriate object attributes
214
        """
UNCOV
215
        self.merge_policy.clear()
×
UNCOV
216
        for merge in merge_policy:
×
UNCOV
217
            if isinstance(merge, str):
×
UNCOV
218
                name = merge
×
UNCOV
219
                args = ()
×
220
            else:
UNCOV
221
                name = merge[0]
×
UNCOV
222
                args = merge[1:]
×
UNCOV
223
            self.merge_policy.append(self.MODEL_CMP_MAPPING[name](*args))
×
224

UNCOV
225
        self.structure_fn = self.STRUCTURE_FN_MAPPING[structure]
×
226

UNCOV
227
        if framework != "custom":
×
UNCOV
228
            self.model_generator = self.MODEL_GENERATOR_MAPPING[framework]
×
229
        else:
UNCOV
230
            module, cls = code_generator.rsplit('.', 1)
×
UNCOV
231
            m = importlib.import_module(module)
×
UNCOV
232
            self.model_generator = getattr(m, cls)
×
233

UNCOV
234
        self.model_generator_kwargs = dict(
×
235
            post_init_converters=self.strings_converters,
236
            convert_unicode=not disable_unicode_conversion,
237
            max_literals=self.max_literals
238
        )
UNCOV
239
        if code_generator_kwargs_raw:
×
UNCOV
240
            for item in code_generator_kwargs_raw:
×
UNCOV
241
                if item[0] == '"':
×
242
                    item = item[1:]
×
UNCOV
243
                if item[-1] == '"':
×
244
                    item = item[:-1]
×
UNCOV
245
                name, value = item.split("=", 1)
×
UNCOV
246
                self.model_generator_kwargs[name] = value
×
247

UNCOV
248
        self.dict_keys_regex = [re.compile(rf"^{r}$") for r in dict_keys_regex] if dict_keys_regex else ()
×
UNCOV
249
        self.dict_keys_fields = dict_keys_fields or ()
×
UNCOV
250
        if preamble:
×
UNCOV
251
            preamble = preamble.strip()
×
UNCOV
252
        self.preamble = preamble or None
×
UNCOV
253
        self.initialized = True
×
254

255
    @classmethod
5✔
256
    def _create_argparser(cls) -> argparse.ArgumentParser:
5✔
257
        """
258
        ArgParser factory
259
        """
260
        parser = argparse.ArgumentParser(
5✔
261
            formatter_class=argparse.RawTextHelpFormatter,
262
            description="Convert given json files into Python models."
263
        )
264

265
        parser.add_argument(
266
            "-m", "--model",
267
            nargs="+", action="append", metavar=("<Model name> [<JSON lookup>] <File path or pattern>", ""),
268
            help="Model name and its JSON data as path or unix-like path pattern.\n"
269
                 "'*',  '**' or '?' patterns symbols are supported.\n\n"
270
                 "JSON data could be array of models or single model\n\n"
271
                 "If this file contains dict with nested list than you can pass\n"
272
                 "<JSON lookup>. Deep lookups are supported by dot-separated path.\n"
273
                 "If no lookup needed pass '-' as <JSON lookup> (default)\n\n"
274
        )
275
        parser.add_argument(
5✔
276
            "-i", "--input-format",
277
            default="json",
278
            choices=['json', 'yaml', 'ini'],
279
            help="Input files parser ('PyYaml' is required to parse yaml files)\n\n"
280
        )
281
        parser.add_argument(
5✔
282
            "-o", "--output",
283
            metavar="FILE", default="",
284
            help="Path to output file\n\n"
285
        )
286
        parser.add_argument(
287
            "-f", "--framework",
288
            default="base",
289
            choices=list(cls.MODEL_GENERATOR_MAPPING.keys()) + ["custom"],
290
            help="Model framework for which python code is generated.\n"
291
                 "'base' (default) mean no framework so code will be generated without any decorators\n"
292
                 "and additional meta-data.\n"
293
                 "If you pass 'custom' you should specify --code-generator argument\n\n"
294
        )
295
        parser.add_argument(
5✔
296
            "-s", "--structure",
297
            default="flat",
298
            choices=list(cls.STRUCTURE_FN_MAPPING.keys()),
299
            help="Models composition style. By default nested models become nested Python classes.\n\n"
300
        )
301
        parser.add_argument(
5✔
302
            "--datetime",
303
            action="store_true",
304
            help="Enable datetime/date/time strings parsing.\n"
305
                 "Warn.: This can lead to 6-7 times slowdown on large datasets.\n"
306
                 "       Be sure that you really need this option.\n\n"
307
        )
308
        parser.add_argument(
5✔
309
            "--strings-converters",
310
            action="store_true",
311
            help="Enable generation of string types converters (i.e. IsoDatetimeString or BooleanString).\n\n"
312
        )
313
        parser.add_argument(
5✔
314
            "--max-strings-literals",
315
            type=int,
316
            default=GenericModelCodeGenerator.DEFAULT_MAX_LITERALS,
317
            metavar='NUMBER',
318
            help="Generate Literal['foo', 'bar'] when field have less than NUMBER string constants as values.\n"
319
                 f"Pass 0 to disable. By default NUMBER={GenericModelCodeGenerator.DEFAULT_MAX_LITERALS}"
320
                 f" (some generator classes could override it)\n\n"
321
        )
322
        parser.add_argument(
5✔
323
            "--disable-unicode-conversion", "--no-unidecode",
324
            action="store_true",
325
            help="Disabling unicode conversion in fields and class names.\n\n"
326
        )
327

328
        default_percent = f"{ModelFieldsPercentMatch.DEFAULT * 100:.0f}"
5✔
329
        default_number = f"{ModelFieldsNumberMatch.DEFAULT:.0f}"
5✔
330
        parser.add_argument(
5✔
331
            "--merge",
332
            default=["percent", "number"],
333
            nargs="+",
334
            help=(
335
                f"Merge policy settings. Default is 'percent_{default_percent} number_{default_number}' (percent of field match\n"
336
                "or number of fields match).\n"
337
                "Possible values are:\n"
338
                "'percent[_<percent>]' - two models had a certain percentage of matched field names.\n"
339
                f"                        Default percent is {default_percent}%%. "
340
                "Custom value could be i.e. 'percent_95'.\n"
341
                "'number[_<number>]'   - two models had a certain number of matched field names.\n"
342
                f"                        Default number of fields is {default_number}.\n"
343
                "'exact'               - two models should have exact same field names to merge.\n\n"
344
            )
345
        )
346
        parser.add_argument(
5✔
347
            "--dict-keys-regex", "--dkr",
348
            nargs="+", metavar="RegEx",
349
            help="List of regular expressions (Python syntax).\n"
350
                 "If all keys of some dict are match one of them\n"
351
                 "then this dict will be marked as dict field but not nested model.\n"
352
                 "Note: ^ and $ tokens will be added automatically but you have to\n"
353
                 "escape other special characters manually.\n"
354
        )
355
        parser.add_argument(
5✔
356
            "--dict-keys-fields", "--dkf",
357
            nargs="+", metavar="FIELD NAME",
358
            help="List of model fields names that will be marked as dict fields\n\n"
359
        )
360
        parser.add_argument(
5✔
361
            "--code-generator",
362
            help="Absolute import path to GenericModelCodeGenerator subclass.\n"
363
                 "Works in pair with '-f custom'\n\n"
364
        )
365
        parser.add_argument(
366
            "--code-generator-kwargs",
367
            metavar="NAME=VALUE",
368
            nargs="*", type=str,
369
            help="List of code generator arguments (for __init__ method).\n"
370
                 "Each argument should be in following format:\n"
371
                 "    argument_name=value or \"argument_name=value with space\"\n"
372
                 "Boolean values should be passed in JS style: true | false"
373
                 "\n\n"
374
        )
375
        parser.add_argument(
5✔
376
            "--preamble",
377
            type=str,
378
            help="Code to insert into the generated file after the imports and before the list of classes\n\n"
379
        )
380
        parser.add_argument(
5✔
381
            "--disable-str-serializable-types",
382
            metavar="TYPE",
383
            default=[],
384
            nargs="*", type=str,
385
            help="List of python types for which StringSerializable should be disabled, i.e:\n"
386
                 "--disable-str-serializable-types float int\n"
387
                 "Alternatively you could use the name of StringSerializable subclass itself (i.e. IntString)"
388
                 "\n\n"
389
        )
390
        parser.add_argument(
5✔
391
            "-l", "--list",
392
            nargs=3, action="append", metavar=("<Model name>", "<JSON lookup>", "<JSON file>"),
393
            help="DEPRECATED, use --model argument instead"
394
        )
395

396
        return parser
5✔
397

398

399
def main():
400
    import os
401

402
    if os.getenv("TRAVIS", None) or os.getenv("FORCE_COVERAGE", None):
403
        # Enable coverage if it is Travis-CI or env variable FORCE_COVERAGE set to true
404
        import coverage
405

406
        coverage.process_startup()
407

408
    cli = Cli()
409
    cli.parse_args()
410
    print(cli.run())
411

412

413
class FileLoaders:
5✔
414
    T = Callable[[Path], Union[dict, list]]
5✔
415

416
    @staticmethod
5✔
417
    def json(path: Path) -> Union[dict, list]:
5✔
UNCOV
418
        with path.open() as fp:
×
UNCOV
419
            return json.load(fp)
×
420

421
    @staticmethod
5✔
422
    def yaml(path: Path) -> Union[dict, list]:
5✔
NEW
423
        if yaml_load is None:
×
NEW
424
            print('Yaml parser is not installed. To parse yaml files ruamel.yaml (or PyYaml) is required.')
×
UNCOV
425
            raise ImportError('yaml')
×
UNCOV
426
        with path.open() as fp:
×
NEW
427
            return yaml_load(fp)
×
428

429
    @staticmethod
5✔
430
    def ini(path: Path) -> dict:
5✔
UNCOV
431
        config = configparser.ConfigParser()
×
UNCOV
432
        with path.open() as fp:
×
UNCOV
433
            config.read_file(fp)
×
UNCOV
434
        return {s: dict(config.items(s)) for s in config.sections()}
×
435

436

437
def dict_lookup(d: Union[dict, list], lookup: str) -> Union[dict, list]:
5✔
438
    """
439
    Extract nested value from key path.
440
    If lookup is "-" returns dict as is.
441

442
    :param d: Nested dict
443
    :param lookup: Dot separated lookup path
444
    :return: Nested value
445
    """
446
    while lookup and lookup != "-":
5✔
447
        split = lookup.split('.', 1)
5✔
448
        if len(split) == 1:
5✔
449
            return d[split[0]]
5✔
450
        key, lookup = split
5✔
451
        d = d[key]
5✔
452
    return d
5✔
453

454

455
def iter_json_file(data: Union[dict, list], lookup: str) -> Generator[Union[dict, list], Any, None]:
5✔
456
    """
457
    Perform lookup and return generator over json list.
458
    Does not open file until iteration is started.
459

460
    :param data: JSON data
461
    :param lookup: Dot separated lookup path
462
    :return: Generator of the model data
463
    """
464
    item = dict_lookup(data, lookup)
5✔
465
    if isinstance(item, list):
5✔
466
        yield from item
5✔
UNCOV
467
    elif isinstance(item, dict):
×
UNCOV
468
        yield item
×
469
    else:
UNCOV
470
        raise TypeError(f'dict or list is expected at {lookup if lookup != "-" else "JSON root"}, not {type(item)}')
×
471

472

473
def process_path(path: str) -> Iterable[Path]:
5✔
474
    """
475
    Convert path pattern into path iterable.
476
    If non-pattern path is given return tuple of one element: (path,)
477
    """
478
    split_path = path_split(path)
5✔
479
    clean_path = list(itertools.takewhile(
5✔
480
        lambda part: "*" not in part and "?" not in part,
481
        split_path
482
    ))
483
    pattern_path = split_path[len(clean_path):]
5✔
484

485
    if clean_path:
5✔
486
        clean_path = os.path.join(*clean_path)
5✔
487
    else:
488
        clean_path = "."
5✔
489

490
    if pattern_path:
5✔
491
        pattern_path = os.path.join(*pattern_path)
5✔
492
    else:
493
        pattern_path = None
5✔
494

495
    path = Path(clean_path)
5✔
496
    if pattern_path:
5✔
497
        return path.glob(pattern_path)
5✔
498
    else:
499
        return path,
5✔
500

501

502
def path_split(path: str) -> List[str]:
5✔
503
    """
504
    Split path into list of components
505

506
    :param path: string path
507
    :return: List of files/patterns
508
    """
509
    folders = []
5✔
510
    while True:
3✔
511
        path, folder = os.path.split(path)
5✔
512

513
        if folder:
5✔
514
            folders.append(folder)
5✔
515
        else:
516
            if path:
5✔
517
                folders.append(path)
5✔
518
            break
5✔
519
    folders.reverse()
5✔
520
    return folders
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc