• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ValeriyMenshikov / restcodegen / 15833858062

23 Jun 2025 07:59PM UTC coverage: 64.627%. First build
15833858062

push

github

web-flow
Merge pull request #1 from ValeriyMenshikov/manual_refactor

Manual refactor

260 of 293 new or added lines in 12 files covered. (88.74%)

433 of 670 relevant lines covered (64.63%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.23
/restcodegen/generator/parser.py
1
import re
1✔
2
from pathlib import Path
1✔
3
from typing import Any
1✔
4

5
from pydantic import BaseModel, Field, HttpUrl
1✔
6

7
from restcodegen.generator.log import LOGGER
1✔
8
from restcodegen.generator.parameters import (
1✔
9
    BaseParameter,
10
    ParameterType,
11
)
12
from restcodegen.generator.spec_loader import SpecLoader
1✔
13
from restcodegen.generator.utils import (
1✔
14
    name_to_snake,
15
    rename_python_builtins,
16
    snake_to_camel,
17
)
18

19
TYPE_MAP = {
1✔
20
    "integer": "int",
21
    "number": "float",
22
    "string": "str",
23
    "boolean": "bool",
24
    "array": "list",
25
    "anyof": "str",
26
    "none": "Any",
27
}
28

29
DEFAULT_HEADER_VALUE_MAP = {"int": 0, "float": 0.0, "str": "", "bool": True}
1✔
30

31

32
class Handler(BaseModel):
1✔
33
    path: str = Field(...)
1✔
34
    method: str = Field(...)
1✔
35
    tags: list = Field(...)
1✔
36
    summary: str | None = Field(None)
1✔
37
    operation_id: str | None = Field(None)
1✔
38
    path_parameters: list[BaseParameter] | None = Field(None)
1✔
39
    query_parameters: list[BaseParameter] | None = Field(None)
1✔
40
    headers: list[BaseParameter] | None = Field(None)
1✔
41
    request_body: str | None = Field(None)
1✔
42
    responses: dict | None = Field(None)
1✔
43

44
    def get(self, key: str, default: Any = None) -> Any:
1✔
45
        return getattr(self, key, default)
×
46

47

48
class OpenApiSpec(BaseModel):
1✔
49
    service_name: str = Field(...)
1✔
50
    version: str = Field(...)
1✔
51
    title: str = Field(...)
1✔
52
    description: str = Field(...)
1✔
53
    openapi_version: str = Field(...)
1✔
54
    handlers: list[Handler]
1✔
55
    request_models: set[str] = set()
1✔
56
    response_models: set[str] = set()
1✔
57
    api_tags: set[str] = set()
1✔
58
    all_tags: set[str] = set()
1✔
59

60

61
class Parser:
1✔
62
    BASE_PATH = Path.cwd() / "clients" / "http"
1✔
63

64
    def __init__(
1✔
65
        self,
66
        openapi_spec: str | HttpUrl,
67
        service_name: str,
68
        selected_tags: list[str] | None = None,
69
    ) -> None:
70
        self._spec_path = str(openapi_spec)
1✔
71

72
        self._service_name = service_name
1✔
73
        self.openapi_spec: dict = SpecLoader(self._spec_path, self._service_name).open()
1✔
74
        self.version: str = ""
1✔
75
        self.description: str = ""
1✔
76
        self.openapi_version: str = ""
1✔
77
        self.handlers: list[Handler] = []
1✔
78
        self._selected_tags: set[str] = set(selected_tags) if selected_tags else set()
1✔
79
        self.all_tags: set[str] = set()
1✔
80
        self.parse()
1✔
81

82
    @property
1✔
83
    def apis(self) -> set[str]:
1✔
84
        result_tags = set()
1✔
85
        for tag in self._selected_tags:
1✔
86
            if tag not in self.all_tags:
1✔
87
                LOGGER.warning(f"Tag {tag} not found in openapi spec")
1✔
88
            else:
89
                result_tags.add(tag)
1✔
90

91
        if not result_tags and self._selected_tags:
1✔
92
            LOGGER.warning("Tags not found in openapi spec, used default tags")
1✔
93
            return self.all_tags
1✔
94
        elif not result_tags and not self._selected_tags:
1✔
95
            return self.all_tags
1✔
96

97
        return result_tags
1✔
98

99
    @property
1✔
100
    def service_name(self) -> str:
1✔
101
        return self._service_name
1✔
102

103
    @property
1✔
104
    def client_type(self) -> str:
1✔
105
        return "http"
1✔
106

107
    def _get_request_body(self, request_body: dict | list) -> str | None:
1✔
108
        if isinstance(request_body, list):
1✔
109
            for parameter in request_body:
1✔
110
                if parameter.get("in") == "body":
1✔
111
                    schema = parameter.get("schema", {}).get("$ref", None)
×
112
                    if schema:
×
113
                        model_path = snake_to_camel(schema.split("/")[-1])
×
114
                        model_name = model_path[0].upper() + model_path[1:]
×
115
                        return model_name
×
116
        else:
117
            for content_type in request_body.get("content", {}).keys():
1✔
118
                schema = (
1✔
119
                    request_body.get("content", {})
120
                    .get(content_type, {})
121
                    .get("schema", {})
122
                    .get("$ref", None)
123
                )
124
                if schema:
1✔
125
                    model_name = snake_to_camel(schema.split("/")[-1])
1✔
126
                    return model_name
1✔
127
        return None
1✔
128

129
    def _get_response_body(self, response_body: dict) -> dict:
1✔
130
        responses: dict = {}
1✔
131
        if response_body:
1✔
132
            if self.openapi_version.startswith("3."):
1✔
133
                for status_code in response_body:
1✔
134
                    for content_type in (
1✔
135
                        response_body.get(status_code, {}).get("content", {}).keys()
136
                    ):
137
                        schema = (
1✔
138
                            response_body.get(status_code, {})
139
                            .get("content", {})
140
                            .get(content_type, {})
141
                            .get("schema", {})
142
                            .get("$ref", None)
143
                        )
144
                        model_name = schema.split("/")[-1] if schema else None
1✔
145
                        if model_name:
1✔
146
                            model_name = snake_to_camel(model_name)
1✔
147
                            responses[status_code] = model_name
1✔
148
            elif self.openapi_version.startswith("2."):
×
149
                for status_code, response in response_body.items():
×
150
                    ref_schema = response.get("schema", {}).get("$ref")
×
151
                    result_schema = response.get("schema", {}).get("result")
×
152
                    schema = ref_schema or result_schema
×
153
                    if schema:
×
154
                        model = snake_to_camel(schema.split("/")[-1])
×
155
                        responses[status_code] = model
×
156
                        model_name = model[0].upper() + model[1:]
×
157
                        responses[status_code] = model_name
×
158
        return responses
1✔
159

160
    def _get_headers(self, parameters: list[dict]) -> list[BaseParameter]:
1✔
161
        params = self._get_params_with_types(
1✔
162
            parameters, param_type=ParameterType.HEADER
163
        )
164
        return params
1✔
165

166
    def _get_path_parameters(self, parameters: list[dict]) -> list[BaseParameter]:
1✔
167
        params = self._get_params_with_types(parameters, param_type=ParameterType.PATH)
1✔
168
        return params
1✔
169

170
    def _get_query_parameters(self, parameters: list[dict]) -> list[BaseParameter]:
1✔
171
        params = self._get_params_with_types(parameters, param_type=ParameterType.QUERY)
1✔
172
        return params
1✔
173

174
    @staticmethod
1✔
175
    def _get_params_with_types(
1✔
176
        parameters: list[dict], param_type: ParameterType
177
    ) -> list[BaseParameter]:
178
        params: list[BaseParameter] = []
1✔
179
        if not parameters:
1✔
180
            return params
1✔
181
        for parameter in parameters:
1✔
182
            if parameter.get("in") == param_type:
1✔
183
                parameter_type = parameter.get("schema", {})
1✔
184
                any_of = parameter_type.get("anyOf")
1✔
185
                enum = parameter_type.get("$ref")
1✔
186

187
                parameter_type = parameter_type.get("type")
1✔
188
                parameter_name = parameter.get("name")
1✔
189
                parameter_description = parameter.get("description", "")
1✔
190
                parameter_is_required = parameter.get("required", False)
1✔
191

192
                if any_of:
1✔
193
                    parameter_type = "anyof"
×
194
                if enum:
1✔
195
                    parameter_type = enum.split("/")[-1]
×
196

197
                # Пропускаем параметр, если имя не определено
198
                if parameter_name is None:
1✔
NEW
199
                    continue
×
200

201
                parameter_with_desc = BaseParameter(
1✔
202
                    name=parameter_name,
203
                    type_=parameter_type
204
                    if enum
205
                    else TYPE_MAP[str(parameter_type).lower()],
206
                    description=parameter_description,
207
                    required=parameter_is_required,
208
                    default=DEFAULT_HEADER_VALUE_MAP.get(
209
                        TYPE_MAP.get(parameter_type, "")
210
                    ),
211
                )
212

213
                params.append(parameter_with_desc)
1✔
214

215
        return params
1✔
216

217
    @staticmethod
1✔
218
    def _normalize_swagger_path(path: str, fix_builtins: bool = True) -> str:
1✔
219
        def replace_placeholder(match: re.Match) -> str:
1✔
220
            placeholder = match.group(0)[1:-1]
×
221
            if not placeholder:
×
222
                return ""
×
223

224
            return (
×
225
                f"{{{rename_python_builtins(name_to_snake(placeholder))}}}"
226
                if fix_builtins
227
                else f"{{{name_to_snake(placeholder)}}}"
228
            )
229

230
        normalized_path = re.sub(r"\{[^}]*\}", replace_placeholder, path)
1✔
231
        return normalized_path
1✔
232

233
    @staticmethod
1✔
234
    def _extract_path_params_from_url(path: str) -> list[BaseParameter]:
1✔
235
        params = []
1✔
236
        path_params = re.findall(r"\{([^}]+)\}", path)
1✔
237

238
        for param in path_params:
1✔
239
            param_name = name_to_snake(param)
×
240
            params.append(
×
241
                BaseParameter(
242
                    name=param_name,
243
                    type_="str",
244
                    description=f"Path parameter: {param_name}",
245
                    required=True,
246
                )
247
            )
248

249
        return params
1✔
250

251
    def parse(self) -> list[Handler]:
1✔
252
        info = self.openapi_spec.get("info", {})
1✔
253
        self.version = info.get("version", "1.0.0")
1✔
254
        self.description = info.get("description", "")
1✔
255
        self.openapi_version = self.openapi_spec.get(
1✔
256
            "openapi", ""
257
        ) or self.openapi_spec.get("swagger", "")
258

259
        if self.openapi_version.startswith("2."):
1✔
260
            LOGGER.warning(
×
261
                "OpenAPI/Swagger version 2.0 is not supported. "
262
                "You may convert it to 3.0 with https://converter.swagger.io/ "
263
                "and set the local spec path in 'swagger' option in nuke.toml!"
264
            )
265

266
        paths = self.openapi_spec.get("paths", {})
1✔
267
        for path, methods in paths.items():
1✔
268
            for method, details in methods.items():
1✔
269
                self._process_method(path, method, details)
1✔
270
        return self.handlers
1✔
271

272
    def _process_method(self, path: str, method: str, details: dict) -> None:
1✔
273
        tags = details.get("tags", [])
1✔
274
        for tag in tags:
1✔
275
            self.all_tags.add(tag)
1✔
276

277
        summary = details.get("summary", "")
1✔
278
        operation_id = details.get("operationId", "")
1✔
279
        parameters = details.get("parameters", [])
1✔
280
        query_parameters = self._get_query_parameters(parameters)
1✔
281
        path_parameters = self._get_path_parameters(parameters)
1✔
282
        headers = self._get_headers(parameters)
1✔
283
        request_body = self._get_request_body(
1✔
284
            details.get("requestBody", details.get("parameters", {}))
285
        )
286
        responses = self._get_response_body(details.get("responses", {}))
1✔
287

288
        if not path_parameters:
1✔
289
            path_parameters = self._extract_path_params_from_url(path)
1✔
290

291
        path_obj = Handler(
1✔
292
            path=self._normalize_swagger_path(path),
293
            method=method,
294
            tags=tags,
295
            summary=summary,
296
            operation_id=operation_id,
297
            query_parameters=query_parameters,
298
            headers=headers,
299
            path_parameters=path_parameters,
300
            request_body=request_body,
301
            responses=responses,
302
        )
303
        self.handlers.append(path_obj)
1✔
304

305
    def request_models(self) -> set[str]:
1✔
306
        models: set[str] = set()
1✔
307
        for handler in self.handlers:
1✔
308
            if handler.request_body is not None:
1✔
309
                models.add(handler.request_body)
1✔
310
        return models
1✔
311

312
    def response_models(self) -> set[str]:
1✔
313
        models: set[str] = set()
1✔
314
        for handler in self.handlers:
1✔
315
            if handler.responses is not None:
1✔
316
                models.update(handler.responses.values())
1✔
317
        return models
1✔
318

319
    def models_by_tag(self, tag: str) -> set[str]:
1✔
320
        models: set[str] = set()
1✔
321
        for handler in self.handlers:
1✔
322
            if tag in handler.tags:
1✔
323
                if handler.path_parameters is not None:
1✔
324
                    for param in handler.path_parameters:
1✔
NEW
325
                        param_type = param.type
×
NEW
326
                        if param_type not in TYPE_MAP.values():
×
NEW
327
                            models.add(param_type)
×
328
                if handler.query_parameters is not None:
1✔
329
                    for query_param in handler.query_parameters:
1✔
330
                        query_param_type = query_param.type
1✔
331
                        if query_param_type not in TYPE_MAP.values():
1✔
NEW
332
                            models.add(query_param_type)
×
333
                if handler.headers is not None:
1✔
334
                    for header_param in handler.headers:
1✔
NEW
335
                        header_param_type = header_param.type
×
NEW
336
                        if header_param_type not in TYPE_MAP.values():
×
NEW
337
                            models.add(header_param_type)
×
338
                if handler.request_body is not None:
1✔
339
                    models.add(handler.request_body)
1✔
340
                if handler.responses is not None:
1✔
341
                    models.update(handler.responses.values())
1✔
342
        return models
1✔
343

344
    def handlers_by_tag(self, tag: str) -> list[Handler]:
1✔
345
        return [h for h in self.handlers if tag in h.tags]
1✔
346

347
    def handlers_by_method(self, method: str) -> list[Handler]:
1✔
348
        return [h for h in self.handlers if h.method == method]
×
349

350
    def handler_by_path(self, path: str) -> list[Handler]:
1✔
351
        return [h for h in self.handlers if h.path == path]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc