• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ValeriyMenshikov / restcodegen / 19740858336

27 Nov 2025 03:16PM UTC coverage: 83.311% (-3.5%) from 86.838%
19740858336

push

github

web-flow
Merge pull request #9 from ValeriyMenshikov/release/2.0.1

Release/2.0.1

340 of 414 new or added lines in 10 files covered. (82.13%)

7 existing lines in 2 files now uncovered.

614 of 737 relevant lines covered (83.31%)

2.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.52
/restcodegen/generator/parser.py
1
from __future__ import annotations
3✔
2

3
import json
3✔
4
import tempfile
3✔
5
from dataclasses import dataclass
3✔
6
import re
3✔
7
from pathlib import Path
3✔
8
from typing import Any, Dict
3✔
9

10
from datamodel_code_generator import OpenAPIScope, PythonVersion
3✔
11
from datamodel_code_generator.parser.openapi import (
3✔
12
    OpenAPIParser,
13
    Operation,
14
    ParameterObject,
15
    ReferenceObject,
16
    RequestBodyObject,
17
    ResponseObject,
18
)
19

20
from restcodegen.generator.log import LOGGER
3✔
21
from restcodegen.generator.utils import name_to_snake, rename_python_builtins, snake_to_camel
3✔
22
from pydantic import BaseModel, ConfigDict
3✔
23
from restcodegen.generator.spec.loader import SpecLoader
3✔
24

25

26
OPERATION_NAMES: set[str] = {"get", "put", "post", "delete", "patch", "head", "options", "trace"}
3✔
27

28
TYPE_MAP = {
3✔
29
    "integer": "int",
30
    "number": "float",
31
    "string": "str",
32
    "boolean": "bool",
33
    "array": "list",
34
    "object": "Any",
35
}
36

37

38
@dataclass(slots=True)
3✔
39
class ParsedOperation:
3✔
40
    path: str
3✔
41
    method: str
3✔
42
    operation: Operation
3✔
43
    parameters: list[ParameterObject]
3✔
44
    request_body: RequestBodyObject | None
3✔
45
    responses: dict[str, ResponseObject]
3✔
46
    raw_operation: dict[str, Any]
3✔
47

48

49
class OperationContext(BaseModel):
3✔
50
    model_config = ConfigDict(arbitrary_types_allowed=True)
3✔
51

52
    operation: Operation
3✔
53
    method: str
3✔
54
    path: str
3✔
55
    raw_path: str
3✔
56
    tags: list[str]
3✔
57
    summary: str | None
3✔
58
    operation_id: str
3✔
59
    parameters: dict[str, list[dict[str, Any]]]
3✔
60
    request_body_model: str | None
3✔
61
    responses: dict[str, str]
3✔
62
    success_response: str | None
3✔
63

64
    def get(self, key: str, default: Any = None) -> Any:
3✔
UNCOV
65
        return getattr(self, key, default)
×
66

67

68
class Parser:
3✔
69
    BASE_PATH = Path.cwd() / "clients" / "http"
3✔
70

71
    def __init__(
3✔
72
        self,
73
        openapi_spec: dict[str, Any],
74
        service_name: str,
75
        selected_tags: list[str] | None = None,
76
    ) -> None:
77
        self.openapi_spec = openapi_spec
3✔
78
        self._raw_spec = openapi_spec
3✔
79
        self._service_name = service_name
3✔
80
        self.version: str = ""
3✔
81
        self.description: str = ""
3✔
82
        self.openapi_version: str = ""
3✔
83
        self._selected_tags: set[str] = set(selected_tags) if selected_tags else set()
3✔
84
        self.all_tags: set[str] = set()
3✔
85
        self.request_model_names: set[str] = set()
3✔
86
        self.response_model_names: set[str] = set()
3✔
87
        self._operations: list[ParsedOperation] | None = None
3✔
88
        self.parse()
3✔
89

90
    @classmethod
3✔
91
    def from_source(
3✔
92
        cls,
93
        openapi_spec: str,
94
        package_name: str,
95
        *,
96
        selected_tags: list[str] | None = None,
97
        loader: SpecLoader | None = None,
98
    ) -> "Parser":
99
        spec_loader = loader or SpecLoader(openapi_spec, package_name)
3✔
100
        spec = spec_loader.open()
3✔
101
        return cls(spec, package_name, selected_tags=selected_tags)
3✔
102

103
    @property
3✔
104
    def apis(self) -> set[str]:
3✔
105
        if not self._selected_tags:
3✔
106
            return self.all_tags
3✔
107

108
        result_tags = {tag for tag in self._selected_tags if tag in self.all_tags}
3✔
109
        if not result_tags:
3✔
110
            LOGGER.warning(
3✔
111
                "Ни один из выбранных тегов не найден в спецификации. Будут использованы все доступные теги."
112
            )
113
            return self.all_tags
3✔
114

115
        return result_tags
3✔
116

117
    @property
3✔
118
    def service_name(self) -> str:
3✔
119
        return self._service_name
3✔
120

121
    def parse(self) -> list[ParsedOperation]:
3✔
122
        info = self.openapi_spec.get("info", {})
3✔
123
        self.version = info.get("version", "1.0.0")
3✔
124
        self.description = info.get("description", "")
3✔
125
        self.openapi_version = self.openapi_spec.get("openapi", "") or self.openapi_spec.get("swagger", "")
3✔
126

127
        if self.openapi_version.startswith("2."):
3✔
NEW
128
            LOGGER.warning(
×
129
                "OpenAPI/Swagger version 2.0 требует конвертации в 3.x. "
130
                "Пожалуйста, обновите спецификацию перед генерацией."
131
            )
132

133
        parser = self._init_openapi_parser()
3✔
134
        operations = self._collect_operations(parser)
3✔
135
        tags, request_models, response_models = self._collect_metadata(operations)
3✔
136

137
        self._operations = operations
3✔
138
        self.all_tags = tags
3✔
139
        self.request_model_names = request_models
3✔
140
        self.response_model_names = response_models
3✔
141
        return operations
3✔
142

143
    @property
3✔
144
    def operations(self) -> list[ParsedOperation]:
3✔
145
        return list(self._operations or [])
3✔
146

147
    def get_operation_context(self, operation: ParsedOperation) -> OperationContext:
3✔
148
        parameters = self._build_parameters(operation)
3✔
149
        if not parameters["path"]:
3✔
150
            parameters["path"] = self._fallback_path_parameters(operation.path)
3✔
151

152
        responses = self._extract_response_models(operation.responses)
3✔
153
        request_body_model = self._extract_request_body_model(operation)
3✔
154
        success_response = self._get_success_response(responses)
3✔
155

156
        operation_id = operation.operation.operationId or self._generate_operation_id(operation)
3✔
157

158
        return OperationContext(
3✔
159
            operation=operation.operation,
160
            method=operation.method,
161
            path=self._normalize_path(operation.path),
162
            raw_path=operation.path,
163
            tags=operation.operation.tags or [],
164
            summary=operation.operation.summary,
165
            operation_id=operation_id,
166
            parameters=parameters,
167
            request_body_model=request_body_model,
168
            responses=responses,
169
            success_response=success_response,
170
        )
171

172
    def handlers_by_tag(self, tag: str) -> list[ParsedOperation]:
3✔
173
        return [operation for operation in self.operations if tag in (operation.operation.tags or [])]
3✔
174

175
    def handlers_by_method(self, method: str) -> list[ParsedOperation]:
3✔
NEW
176
        return [operation for operation in self.operations if operation.method.lower() == method.lower()]
×
177

178
    def handler_by_path(self, path: str) -> list[ParsedOperation]:
3✔
NEW
179
        normalized = self._normalize_path(path)
×
NEW
180
        return [operation for operation in self.operations if self._normalize_path(operation.path) == normalized]
×
181

182
    def request_models(self) -> set[str]:
3✔
183
        return set(self.request_model_names)
3✔
184

185
    def response_models(self) -> set[str]:
3✔
186
        return set(self.response_model_names)
3✔
187

188
    def models_by_tag(self, tag: str) -> set[str]:
3✔
189
        models: set[str] = set()
3✔
190
        for operation in self.operations:
3✔
191
            if tag in (operation.operation.tags or []):
3✔
192
                request_model = self._extract_request_body_model(operation)
3✔
193
                if request_model:
3✔
194
                    models.add(request_model)
3✔
195
                responses = self._extract_response_models(operation.responses)
3✔
196
                models.update(responses.values())
3✔
197
                for parameter in operation.parameters:
3✔
198
                    param_type = self._extract_parameter_type(parameter)
3✔
199
                    if self._is_complex_type(param_type):
3✔
NEW
200
                        models.add(param_type)
×
201
        return models
3✔
202

203
    def _init_openapi_parser(self) -> OpenAPIParser:
3✔
204
        with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as tmp_file:
3✔
205
            json.dump(self._raw_spec, tmp_file)
3✔
206
            tmp_path = Path(tmp_file.name)
3✔
207

208
        try:
3✔
209
            parser = OpenAPIParser(
3✔
210
                tmp_path,
211
                target_python_version=PythonVersion.PY_310,
212
                openapi_scopes=[OpenAPIScope.Schemas, OpenAPIScope.Paths],
213
                include_path_parameters=True,
214
            )
215
            parser.parse()
3✔
216
        finally:
217
            tmp_path.unlink(missing_ok=True)
3✔
218

219
        return parser
3✔
220

221
    @staticmethod
3✔
222
    def _collect_operations(parser: OpenAPIParser) -> list[ParsedOperation]:
3✔
223
        operations: list[ParsedOperation] = []
3✔
224
        raw_paths: Dict[str, Dict[str, Any]] = parser.raw_obj.get("paths", {})  # type: ignore[assignment]
3✔
225
        for path, methods in raw_paths.items():
3✔
226
            if not isinstance(methods, dict):
3✔
NEW
227
                continue
×
228

229
            for method, raw_operation in methods.items():
3✔
230
                if method not in OPERATION_NAMES:
3✔
UNCOV
231
                    continue
×
232

233
                operation = Operation.model_validate(raw_operation)
3✔
234
                resolved_parameters: list[ParameterObject] = []
3✔
235
                for parameter in operation.parameters:
3✔
236
                    if isinstance(parameter, ReferenceObject):
3✔
NEW
237
                        resolved_param = parser.get_ref_model(parameter.ref)
×
NEW
238
                        resolved_parameters.append(ParameterObject.model_validate(resolved_param))
×
239
                    else:
240
                        resolved_parameters.append(parameter)
3✔
241

242
                request_body: RequestBodyObject | None = None
3✔
243
                if operation.requestBody is not None:
3✔
244
                    if isinstance(operation.requestBody, ReferenceObject):
3✔
NEW
245
                        ref_body = parser.get_ref_model(operation.requestBody.ref)
×
NEW
246
                        request_body = RequestBodyObject.model_validate(ref_body)
×
247
                    else:
248
                        request_body = operation.requestBody
3✔
249

250
                resolved_responses: dict[str, ResponseObject] = {}
3✔
251
                for status_code, response in operation.responses.items():
3✔
252
                    if isinstance(response, ReferenceObject):
3✔
NEW
253
                        ref_response = parser.get_ref_model(response.ref)
×
NEW
254
                        resolved_responses[str(status_code)] = ResponseObject.model_validate(ref_response)
×
255
                    else:
256
                        resolved_responses[str(status_code)] = response
3✔
257

258
                operations.append(
3✔
259
                    ParsedOperation(
260
                        path=path,
261
                        method=method,
262
                        operation=operation,
263
                        parameters=resolved_parameters,
264
                        request_body=request_body,
265
                        responses=resolved_responses,
266
                        raw_operation=raw_operation,
267
                    )
268
                )
269

270
        return operations
3✔
271

272
    def _collect_metadata(self, operations: list[ParsedOperation]) -> tuple[set[str], set[str], set[str]]:
3✔
273
        tags: set[str] = set()
3✔
274
        request_models: set[str] = set()
3✔
275
        response_models: set[str] = set()
3✔
276

277
        for operation in operations:
3✔
278
            tags.update(operation.operation.tags or [])
3✔
279
            request_body_model = self._extract_request_body_model(operation)
3✔
280
            if request_body_model:
3✔
281
                request_models.add(request_body_model)
3✔
282

283
            responses = self._extract_response_models(operation.responses)
3✔
284
            response_models.update(responses.values())
3✔
285

286
        return tags, request_models, response_models
3✔
287

288
    @staticmethod
3✔
289
    def _normalize_path(path: str) -> str:
3✔
290
        def replace_placeholder(match: re.Match[str]) -> str:
3✔
UNCOV
291
            placeholder = match.group(0)[1:-1]
×
UNCOV
292
            if not placeholder:
×
293
                return ""
×
NEW
294
            return "{" + rename_python_builtins(name_to_snake(placeholder)) + "}"
×
295

296
        return re.sub(r"\{[^}]*\}", replace_placeholder, path)
3✔
297

298
    @staticmethod
3✔
299
    def _extract_parameter_type(parameter: ParameterObject) -> str:
3✔
300
        schema = parameter.schema_
3✔
301
        if schema and schema.ref:
3✔
NEW
302
            return Parser._ref_to_model_name(schema.ref)
×
303
        if schema and schema.type:
3✔
304
            schema_type = schema.type.value if hasattr(schema.type, "value") else schema.type
3✔
305
            if isinstance(schema_type, list):
3✔
NEW
306
                schema_type = schema_type[0]
×
307
            mapped = TYPE_MAP.get(str(schema_type).lower())
3✔
308
            if mapped:
3✔
309
                return mapped
3✔
NEW
310
            return str(schema_type)
×
NEW
311
        return "Any"
×
312

313
    def _extract_request_body_model(self, operation: ParsedOperation) -> str | None:
3✔
314
        if not operation.request_body:
3✔
315
            return None
3✔
316
        for media in operation.request_body.content.values():
3✔
317
            schema = media.schema_
3✔
318
            if isinstance(schema, ReferenceObject):
3✔
319
                return self._ref_to_model_name(schema.ref)
3✔
NEW
320
            if schema and schema.ref:
×
NEW
321
                return self._ref_to_model_name(schema.ref)
×
NEW
322
        return None
×
323

324
    @staticmethod
3✔
325
    def _ref_to_model_name(ref: str) -> str:
3✔
326
        name = ref.split("/")[-1]
3✔
327
        return snake_to_camel(name)
3✔
328

329
    @staticmethod
3✔
330
    def _extract_response_models(responses: dict[str, ResponseObject]) -> dict[str, str]:
3✔
331
        result: dict[str, str] = {}
3✔
332
        for status_code, response in responses.items():
3✔
333
            for media in response.content.values():
3✔
334
                schema = media.schema_
3✔
335
                if isinstance(schema, ReferenceObject):
3✔
336
                    result[status_code] = Parser._ref_to_model_name(schema.ref)
3✔
NEW
337
                elif schema and schema.ref:
×
NEW
338
                    result[status_code] = Parser._ref_to_model_name(schema.ref)
×
339
        return result
3✔
340

341
    @staticmethod
3✔
342
    def _extract_parameter_location(parameter: ParameterObject) -> str:
3✔
NEW
343
        if parameter.in_:
×
NEW
344
            return parameter.in_.value
×
NEW
345
        return "query"
×
346

347
    def _build_parameters(self, operation: ParsedOperation) -> dict[str, list[dict[str, Any]]]:
3✔
348
        path_params: list[dict[str, Any]] = []
3✔
349
        query_params: list[dict[str, Any]] = []
3✔
350
        header_params: list[dict[str, Any]] = []
3✔
351

352
        for parameter in operation.parameters:
3✔
NEW
353
            context = self._build_parameter_context(parameter)
×
NEW
354
            location = context["location"]
×
NEW
355
            if location == "path":
×
NEW
356
                path_params.append(context)
×
NEW
357
            elif location == "header":
×
NEW
358
                header_params.append(context)
×
359
            else:
NEW
360
                query_params.append(context)
×
361

362
        return {
3✔
363
            "path": self._sort_parameters(path_params),
364
            "query": self._sort_parameters(query_params),
365
            "header": self._sort_parameters(header_params),
366
        }
367

368
    def _build_parameter_context(self, parameter: ParameterObject) -> dict[str, Any]:
3✔
NEW
369
        name = parameter.name or "param"
×
NEW
370
        python_name = rename_python_builtins(name_to_snake(name))
×
NEW
371
        type_str = self._extract_parameter_type(parameter)
×
NEW
372
        description = parameter.description or ""
×
NEW
373
        default: Any | None = None
×
NEW
374
        if parameter.schema_ and parameter.schema_.default is not None:
×
NEW
375
            default = parameter.schema_.default
×
NEW
376
        location = self._extract_parameter_location(parameter)
×
377

NEW
378
        if not python_name:
×
NEW
379
            fallback_source = f"{location}_{name}" if name else f"{location}_param"
×
NEW
380
            python_name = rename_python_builtins(name_to_snake(fallback_source)) or f"{location}_param"
×
381

NEW
382
        return {
×
383
            "name": name,
384
            "python_name": python_name,
385
            "type": type_str,
386
            "required": bool(parameter.required),
387
            "description": description,
388
            "default": default,
389
            "location": location,
390
        }
391

392
    @staticmethod
3✔
393
    def _sort_parameters(parameters: list[dict[str, Any]]) -> list[dict[str, Any]]:
3✔
394
        return sorted(parameters, key=lambda item: not item["required"])
3✔
395

396
    @staticmethod
3✔
397
    def _fallback_path_parameters(path: str) -> list[dict[str, Any]]:
3✔
398
        params: list[dict[str, Any]] = []
3✔
399
        for index, match in enumerate(re.findall(r"\{([^}]+)\}", path), start=1):
3✔
NEW
400
            python_name = rename_python_builtins(name_to_snake(match))
×
NEW
401
            if not python_name:
×
NEW
402
                python_name = f"path_param_{index}"
×
NEW
403
            params.append(
×
404
                {
405
                    "name": match,
406
                    "python_name": python_name,
407
                    "type": "str",
408
                    "required": True,
409
                    "description": f"Path parameter: {match}",
410
                    "default": None,
411
                    "location": "path",
412
                }
413
            )
414
        return params
3✔
415

416
    @staticmethod
3✔
417
    def _get_success_response(responses: dict[str, str]) -> str | None:
3✔
418
        for status in ("200", "201", "202"):
3✔
419
            if status in responses:
3✔
420
                return responses[status]
3✔
NEW
421
        return None
×
422

423
    @staticmethod
3✔
424
    def _generate_operation_id(operation: ParsedOperation) -> str:
3✔
NEW
425
        normalized_path = Parser._normalize_path(operation.path).strip("/")
×
NEW
426
        if normalized_path:
×
NEW
427
            segments = [segment.replace("_", "__") for segment in normalized_path.split("/") if segment]
×
NEW
428
            escaped_path = "_".join(segments) or "root"
×
429
        else:
NEW
430
            escaped_path = "root"
×
431

NEW
432
        candidate = f"{operation.method}_{escaped_path}"
×
NEW
433
        return rename_python_builtins(name_to_snake(candidate))
×
434

435
    @staticmethod
3✔
436
    def _is_complex_type(type_name: str) -> bool:
3✔
437
        builtin_types = {"int", "float", "str", "bool", "list", "dict", "Any"}
3✔
438
        return type_name not in builtin_types
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc