• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bogdandm / json2python-models / 3822532987

pending completion
3822532987

push

github

bkalashnikov
Update CHANGELOG.md

1577 of 1608 relevant lines covered (98.07%)

4.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.95
/json_to_models/cli.py
1
import argparse
5✔
2
import configparser
5✔
3
import importlib
5✔
4
import itertools
5✔
5
import json
5✔
6
import os.path
5✔
7
import re
5✔
8
import sys
5✔
9
from collections import defaultdict
5✔
10
from datetime import datetime
5✔
11
from pathlib import Path
5✔
12
from typing import Any, Callable, Dict, Generator, Iterable, List, Tuple, Type, Union
5✔
13

14
from .models.sqlmodel import SqlModelCodeGenerator
5✔
15

16
try:
5✔
17
    import ruamel.yaml as yaml
5✔
18
except ImportError:
×
19
    try:
×
20
        import yaml
×
21
    except ImportError:
×
22
        yaml = None
×
23

24
from . import __version__ as VERSION
5✔
25
from .dynamic_typing import ModelMeta, register_datetime_classes, registry
5✔
26
from .generator import MetadataGenerator
5✔
27
from .models import ModelsStructureType
5✔
28
from .models.attr import AttrsModelCodeGenerator
5✔
29
from .models.base import GenericModelCodeGenerator, generate_code
5✔
30
from .models.dataclasses import DataclassModelCodeGenerator
5✔
31
from .models.pydantic import PydanticModelCodeGenerator
5✔
32
from .models.structure import compose_models, compose_models_flat
5✔
33
from .registry import (
5✔
34
    ModelCmp, ModelFieldsEquals, ModelFieldsNumberMatch, ModelFieldsPercentMatch, ModelRegistry
35
)
36
from .utils import convert_args
5✔
37

38
STRUCTURE_FN_TYPE = Callable[[Dict[str, ModelMeta]], ModelsStructureType]
5✔
39
bool_js_style = lambda s: {"true": True, "false": False}.get(s, None)
5✔
40

41

42
class Cli:
5✔
43
    MODEL_CMP_MAPPING = {
5✔
44
        "percent": convert_args(ModelFieldsPercentMatch, lambda s: float(s) / 100),
45
        "number": convert_args(ModelFieldsNumberMatch, int),
46
        "exact": ModelFieldsEquals
47
    }
48

49
    STRUCTURE_FN_MAPPING: Dict[str, STRUCTURE_FN_TYPE] = {
5✔
50
        "nested": compose_models,
51
        "flat": compose_models_flat
52
    }
53

54
    MODEL_GENERATOR_MAPPING: Dict[str, Type[GenericModelCodeGenerator]] = {
5✔
55
        "base": convert_args(GenericModelCodeGenerator),
56
        "attrs": convert_args(AttrsModelCodeGenerator, meta=bool_js_style),
57
        "dataclasses": convert_args(DataclassModelCodeGenerator, meta=bool_js_style,
58
                                    post_init_converters=bool_js_style),
59
        "pydantic": convert_args(PydanticModelCodeGenerator),
60
        "sqlmodel": convert_args(SqlModelCodeGenerator),
61
    }
62

63
    def __init__(self):
5✔
64
        self.initialized = False
5✔
65
        self.models_data: Dict[str, Iterable[dict]] = {}  # -m/-l
5✔
66
        self.enable_datetime: bool = False  # --datetime
5✔
67
        self.strings_converters: bool = False  # --strings-converters
5✔
68
        self.max_literals: int = -1  # --max-strings-literals
5✔
69
        self.merge_policy: List[ModelCmp] = []  # --merge
5✔
70
        self.structure_fn: STRUCTURE_FN_TYPE = None  # -s
5✔
71
        self.model_generator: Type[GenericModelCodeGenerator] = None  # -f & --code-generator
5✔
72
        self.model_generator_kwargs: Dict[str, Any] = None
5✔
73

74
        self.argparser = self._create_argparser()
5✔
75

76
    def parse_args(self, args: List[str] = None):
5✔
77
        """
78
        Parse list of command list arguments
79

80
        :param args: (Optional) List of arguments
81
        :return: None
82
        """
83
        parser = self.argparser
5✔
84
        namespace = parser.parse_args(args)
5✔
85

86
        # Extract args
87
        parser = getattr(FileLoaders, namespace.input_format)
5✔
88
        self.output_file = namespace.output
5✔
89
        self.enable_datetime = namespace.datetime
5✔
90
        disable_unicode_conversion = namespace.disable_unicode_conversion
5✔
91
        self.strings_converters = namespace.strings_converters
5✔
92
        self.max_literals = namespace.max_strings_literals
5✔
93
        merge_policy = [m.split("_") if "_" in m else m for m in namespace.merge]
5✔
94
        structure = namespace.structure
5✔
95
        framework = namespace.framework
5✔
96
        code_generator = namespace.code_generator
5✔
97
        code_generator_kwargs_raw: List[str] = namespace.code_generator_kwargs
5✔
98
        dict_keys_regex: List[str] = namespace.dict_keys_regex
5✔
99
        dict_keys_fields: List[str] = namespace.dict_keys_fields
5✔
100
        preamble: str = namespace.preamble
5✔
101

102
        for name in namespace.disable_str_serializable_types:
5✔
103
            registry.remove_by_name(name)
5✔
104

105
        self.setup_models_data(namespace.model or (), namespace.list or (), parser)
5✔
106
        self.validate(merge_policy, framework, code_generator)
5✔
107
        self.set_args(merge_policy, structure, framework, code_generator, code_generator_kwargs_raw,
5✔
108
                      dict_keys_regex, dict_keys_fields, disable_unicode_conversion, preamble)
109

110
    def run(self):
5✔
111
        if self.enable_datetime:
5✔
112
            register_datetime_classes()
5✔
113
        generator = MetadataGenerator(
5✔
114
            dict_keys_regex=self.dict_keys_regex,
115
            dict_keys_fields=self.dict_keys_fields
116
        )
117
        registry = ModelRegistry(*self.merge_policy)
5✔
118
        for name, data in self.models_data.items():
5✔
119
            meta = generator.generate(*data)
5✔
120
            registry.process_meta_data(meta, name)
5✔
121
        registry.merge_models(generator)
5✔
122
        registry.generate_names()
5✔
123
        structure = self.structure_fn(registry.models_map)
5✔
124
        output = self.version_string + generate_code(
5✔
125
            structure,
126
            self.model_generator,
127
            class_generator_kwargs=self.model_generator_kwargs,
128
            preamble=self.preamble
129
        )
130
        if self.output_file:
5✔
131
            with open(self.output_file, "w", encoding="utf-8") as f:
5✔
132
                f.write(output)
5✔
133
            return f"Output is written to {self.output_file}"
5✔
134
        else:
135
            return output
5✔
136

137
    @property
5✔
138
    def version_string(self):
4✔
139
        return (
5✔
140
            'r"""\n'
141
            f'generated by json2python-models v{VERSION} at {datetime.now().ctime()}\n'
142
            f'command: {" ".join(sys.argv)}\n'
143
            '"""\n'
144
        )
145

146
    def validate(self, merge_policy, framework, code_generator):
5✔
147
        """
148
        Validate parsed args
149

150
        :param merge_policy: List of merge policies. Each merge policy is either string or string and policy arguments
151
        :param framework: Framework name (predefined code generator)
152
        :param code_generator: Code generator import string
153
        :return:
154
        """
155
        for m in merge_policy:
5✔
156
            if isinstance(m, list):
5✔
157
                if m[0] not in self.MODEL_CMP_MAPPING:
5✔
158
                    raise ValueError(f"Invalid merge policy '{m[0]}', choices are {self.MODEL_CMP_MAPPING.keys()}")
5✔
159
            elif m not in self.MODEL_CMP_MAPPING:
5✔
160
                raise ValueError(f"Invalid merge policy '{m}', choices are {self.MODEL_CMP_MAPPING.keys()}")
5✔
161

162
        if framework == 'custom' and code_generator is None:
5✔
163
            raise ValueError("You should specify --code-generator to support custom generator")
5✔
164
        elif framework != 'custom' and code_generator is not None:
5✔
165
            raise ValueError("--code-generator argument has no effect without '--framework custom' argument")
5✔
166

167
    def setup_models_data(
5✔
168
            self,
169
            models: Iterable[Union[
170
                Tuple[str, str],
171
                Tuple[str, str, str],
172
            ]],
173
            models_lists: Iterable[Tuple[str, str, str]],
174
            parser: 'FileLoaders.T'
175
    ):
176
        """
177
        Initialize lazy loaders for models data
178
        """
179
        models_dict: Dict[str, List[dict]] = defaultdict(list)
5✔
180

181
        models = list(models) + list(models_lists)
5✔
182
        for model_tuple in models:
5✔
183
            if len(model_tuple) == 2:
5✔
184
                model_name, path_raw = model_tuple
5✔
185
                lookup = '-'
5✔
186
            elif len(model_tuple) == 3:
5✔
187
                model_name, lookup, path_raw = model_tuple
5✔
188
            else:
189
                raise RuntimeError('`--model` argument should contain exactly 2 or 3 strings')
5✔
190

191
            for real_path in process_path(path_raw):
5✔
192
                iterator = iter_json_file(parser(real_path), lookup)
5✔
193
                models_dict[model_name].extend(iterator)
5✔
194

195
        self.models_data = models_dict
5✔
196

197
    def set_args(
5✔
198
            self,
199
            merge_policy: List[Union[List[str], str]],
200
            structure: str,
201
            framework: str,
202
            code_generator: str,
203
            code_generator_kwargs_raw: List[str],
204
            dict_keys_regex: List[str],
205
            dict_keys_fields: List[str],
206
            disable_unicode_conversion: bool,
207
            preamble: str,
208
    ):
209
        """
210
        Convert CLI args to python representation and set them to appropriate object attributes
211
        """
212
        self.merge_policy.clear()
5✔
213
        for merge in merge_policy:
5✔
214
            if isinstance(merge, str):
5✔
215
                name = merge
5✔
216
                args = ()
5✔
217
            else:
218
                name = merge[0]
5✔
219
                args = merge[1:]
5✔
220
            self.merge_policy.append(self.MODEL_CMP_MAPPING[name](*args))
5✔
221

222
        self.structure_fn = self.STRUCTURE_FN_MAPPING[structure]
5✔
223

224
        if framework != "custom":
5✔
225
            self.model_generator = self.MODEL_GENERATOR_MAPPING[framework]
5✔
226
        else:
227
            module, cls = code_generator.rsplit('.', 1)
5✔
228
            m = importlib.import_module(module)
5✔
229
            self.model_generator = getattr(m, cls)
5✔
230

231
        self.model_generator_kwargs = dict(
5✔
232
            post_init_converters=self.strings_converters,
233
            convert_unicode=not disable_unicode_conversion,
234
            max_literals=self.max_literals
235
        )
236
        if code_generator_kwargs_raw:
5✔
237
            for item in code_generator_kwargs_raw:
5✔
238
                if item[0] == '"':
5✔
239
                    item = item[1:]
×
240
                if item[-1] == '"':
5✔
241
                    item = item[:-1]
×
242
                name, value = item.split("=", 1)
5✔
243
                self.model_generator_kwargs[name] = value
5✔
244

245
        self.dict_keys_regex = [re.compile(rf"^{r}$") for r in dict_keys_regex] if dict_keys_regex else ()
5✔
246
        self.dict_keys_fields = dict_keys_fields or ()
5✔
247
        if preamble:
5✔
248
            preamble = preamble.strip()
5✔
249
        self.preamble = preamble or None
5✔
250
        self.initialized = True
5✔
251

252
    @classmethod
5✔
253
    def _create_argparser(cls) -> argparse.ArgumentParser:
5✔
254
        """
255
        ArgParser factory
256
        """
257
        parser = argparse.ArgumentParser(
5✔
258
            formatter_class=argparse.RawTextHelpFormatter,
259
            description="Convert given json files into Python models."
260
        )
261

262
        parser.add_argument(
263
            "-m", "--model",
264
            nargs="+", action="append", metavar=("<Model name> [<JSON lookup>] <File path or pattern>", ""),
265
            help="Model name and its JSON data as path or unix-like path pattern.\n"
266
                 "'*',  '**' or '?' patterns symbols are supported.\n\n"
267
                 "JSON data could be array of models or single model\n\n"
268
                 "If this file contains dict with nested list than you can pass\n"
269
                 "<JSON lookup>. Deep lookups are supported by dot-separated path.\n"
270
                 "If no lookup needed pass '-' as <JSON lookup> (default)\n\n"
271
        )
272
        parser.add_argument(
5✔
273
            "-i", "--input-format",
274
            default="json",
275
            choices=['json', 'yaml', 'ini'],
276
            help="Input files parser ('PyYaml' is required to parse yaml files)\n\n"
277
        )
278
        parser.add_argument(
5✔
279
            "-o", "--output",
280
            metavar="FILE", default="",
281
            help="Path to output file\n\n"
282
        )
283
        parser.add_argument(
284
            "-f", "--framework",
285
            default="base",
286
            choices=list(cls.MODEL_GENERATOR_MAPPING.keys()) + ["custom"],
287
            help="Model framework for which python code is generated.\n"
288
                 "'base' (default) mean no framework so code will be generated without any decorators\n"
289
                 "and additional meta-data.\n"
290
                 "If you pass 'custom' you should specify --code-generator argument\n\n"
291
        )
292
        parser.add_argument(
5✔
293
            "-s", "--structure",
294
            default="flat",
295
            choices=list(cls.STRUCTURE_FN_MAPPING.keys()),
296
            help="Models composition style. By default nested models become nested Python classes.\n\n"
297
        )
298
        parser.add_argument(
5✔
299
            "--datetime",
300
            action="store_true",
301
            help="Enable datetime/date/time strings parsing.\n"
302
                 "Warn.: This can lead to 6-7 times slowdown on large datasets.\n"
303
                 "       Be sure that you really need this option.\n\n"
304
        )
305
        parser.add_argument(
5✔
306
            "--strings-converters",
307
            action="store_true",
308
            help="Enable generation of string types converters (i.e. IsoDatetimeString or BooleanString).\n\n"
309
        )
310
        parser.add_argument(
5✔
311
            "--max-strings-literals",
312
            type=int,
313
            default=GenericModelCodeGenerator.DEFAULT_MAX_LITERALS,
314
            metavar='NUMBER',
315
            help="Generate Literal['foo', 'bar'] when field have less than NUMBER string constants as values.\n"
316
                 f"Pass 0 to disable. By default NUMBER={GenericModelCodeGenerator.DEFAULT_MAX_LITERALS}"
317
                 f" (some generator classes could override it)\n\n"
318
        )
319
        parser.add_argument(
5✔
320
            "--disable-unicode-conversion", "--no-unidecode",
321
            action="store_true",
322
            help="Disabling unicode conversion in fields and class names.\n\n"
323
        )
324

325
        default_percent = f"{ModelFieldsPercentMatch.DEFAULT * 100:.0f}"
5✔
326
        default_number = f"{ModelFieldsNumberMatch.DEFAULT:.0f}"
5✔
327
        parser.add_argument(
5✔
328
            "--merge",
329
            default=["percent", "number"],
330
            nargs="+",
331
            help=(
332
                f"Merge policy settings. Default is 'percent_{default_percent} number_{default_number}' (percent of field match\n"
333
                "or number of fields match).\n"
334
                "Possible values are:\n"
335
                "'percent[_<percent>]' - two models had a certain percentage of matched field names.\n"
336
                f"                        Default percent is {default_percent}%%. "
337
                "Custom value could be i.e. 'percent_95'.\n"
338
                "'number[_<number>]'   - two models had a certain number of matched field names.\n"
339
                f"                        Default number of fields is {default_number}.\n"
340
                "'exact'               - two models should have exact same field names to merge.\n\n"
341
            )
342
        )
343
        parser.add_argument(
5✔
344
            "--dict-keys-regex", "--dkr",
345
            nargs="+", metavar="RegEx",
346
            help="List of regular expressions (Python syntax).\n"
347
                 "If all keys of some dict are match one of them\n"
348
                 "then this dict will be marked as dict field but not nested model.\n"
349
                 "Note: ^ and $ tokens will be added automatically but you have to\n"
350
                 "escape other special characters manually.\n"
351
        )
352
        parser.add_argument(
5✔
353
            "--dict-keys-fields", "--dkf",
354
            nargs="+", metavar="FIELD NAME",
355
            help="List of model fields names that will be marked as dict fields\n\n"
356
        )
357
        parser.add_argument(
5✔
358
            "--code-generator",
359
            help="Absolute import path to GenericModelCodeGenerator subclass.\n"
360
                 "Works in pair with '-f custom'\n\n"
361
        )
362
        parser.add_argument(
363
            "--code-generator-kwargs",
364
            metavar="NAME=VALUE",
365
            nargs="*", type=str,
366
            help="List of code generator arguments (for __init__ method).\n"
367
                 "Each argument should be in following format:\n"
368
                 "    argument_name=value or \"argument_name=value with space\"\n"
369
                 "Boolean values should be passed in JS style: true | false"
370
                 "\n\n"
371
        )
372
        parser.add_argument(
5✔
373
            "--preamble",
374
            type=str,
375
            help="Code to insert into the generated file after the imports and before the list of classes\n\n"
376
        )
377
        parser.add_argument(
5✔
378
            "--disable-str-serializable-types",
379
            metavar="TYPE",
380
            default=[],
381
            nargs="*", type=str,
382
            help="List of python types for which StringSerializable should be disabled, i.e:\n"
383
                 "--disable-str-serializable-types float int\n"
384
                 "Alternatively you could use the name of StringSerializable subclass itself (i.e. IntString)"
385
                 "\n\n"
386
        )
387
        parser.add_argument(
5✔
388
            "-l", "--list",
389
            nargs=3, action="append", metavar=("<Model name>", "<JSON lookup>", "<JSON file>"),
390
            help="DEPRECATED, use --model argument instead"
391
        )
392

393
        return parser
5✔
394

395

396
def main():
397
    import os
398

399
    if os.getenv("TRAVIS", None) or os.getenv("FORCE_COVERAGE", None):
400
        # Enable coverage if it is Travis-CI or env variable FORCE_COVERAGE set to true
401
        import coverage
402

403
        coverage.process_startup()
404

405
    cli = Cli()
406
    cli.parse_args()
407
    print(cli.run())
408

409

410
class FileLoaders:
5✔
411
    T = Callable[[Path], Union[dict, list]]
5✔
412

413
    @staticmethod
5✔
414
    def json(path: Path) -> Union[dict, list]:
5✔
415
        with path.open() as fp:
5✔
416
            return json.load(fp)
5✔
417

418
    @staticmethod
5✔
419
    def yaml(path: Path) -> Union[dict, list]:
5✔
420
        if yaml is None:
5✔
421
            print('Yaml parser is not installed. To parse yaml files PyYaml (or ruamel.yaml) is required.')
×
422
            raise ImportError('yaml')
×
423
        with path.open() as fp:
5✔
424
            return yaml.safe_load(fp)
5✔
425

426
    @staticmethod
5✔
427
    def ini(path: Path) -> dict:
5✔
428
        config = configparser.ConfigParser()
5✔
429
        with path.open() as fp:
5✔
430
            config.read_file(fp)
5✔
431
        return {s: dict(config.items(s)) for s in config.sections()}
5✔
432

433

434
def dict_lookup(d: Union[dict, list], lookup: str) -> Union[dict, list]:
5✔
435
    """
436
    Extract nested value from key path.
437
    If lookup is "-" returns dict as is.
438

439
    :param d: Nested dict
440
    :param lookup: Dot separated lookup path
441
    :return: Nested value
442
    """
443
    while lookup and lookup != "-":
5✔
444
        split = lookup.split('.', 1)
5✔
445
        if len(split) == 1:
5✔
446
            return d[split[0]]
5✔
447
        key, lookup = split
5✔
448
        d = d[key]
5✔
449
    return d
5✔
450

451

452
def iter_json_file(data: Union[dict, list], lookup: str) -> Generator[Union[dict, list], Any, None]:
5✔
453
    """
454
    Perform lookup and return generator over json list.
455
    Does not open file until iteration is started.
456

457
    :param data: JSON data
458
    :param lookup: Dot separated lookup path
459
    :return: Generator of the model data
460
    """
461
    item = dict_lookup(data, lookup)
5✔
462
    if isinstance(item, list):
5✔
463
        yield from item
5✔
464
    elif isinstance(item, dict):
5✔
465
        yield item
5✔
466
    else:
467
        raise TypeError(f'dict or list is expected at {lookup if lookup != "-" else "JSON root"}, not {type(item)}')
5✔
468

469

470
def process_path(path: str) -> Iterable[Path]:
5✔
471
    """
472
    Convert path pattern into path iterable.
473
    If non-pattern path is given return tuple of one element: (path,)
474
    """
475
    split_path = path_split(path)
5✔
476
    clean_path = list(itertools.takewhile(
5✔
477
        lambda part: "*" not in part and "?" not in part,
478
        split_path
479
    ))
480
    pattern_path = split_path[len(clean_path):]
5✔
481

482
    if clean_path:
5✔
483
        clean_path = os.path.join(*clean_path)
5✔
484
    else:
485
        clean_path = "."
5✔
486

487
    if pattern_path:
5✔
488
        pattern_path = os.path.join(*pattern_path)
5✔
489
    else:
490
        pattern_path = None
5✔
491

492
    path = Path(clean_path)
5✔
493
    if pattern_path:
5✔
494
        return path.glob(pattern_path)
5✔
495
    else:
496
        return path,
5✔
497

498

499
def path_split(path: str) -> List[str]:
5✔
500
    """
501
    Split path into list of components
502

503
    :param path: string path
504
    :return: List of files/patterns
505
    """
506
    folders = []
5✔
507
    while True:
3✔
508
        path, folder = os.path.split(path)
5✔
509

510
        if folder:
5✔
511
            folders.append(folder)
5✔
512
        else:
513
            if path:
5✔
514
                folders.append(path)
5✔
515
            break
5✔
516
    folders.reverse()
5✔
517
    return folders
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc