• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alorence / django-modern-rpc / 18452590329

pending completion
18452590329

Pull #121

github

web-flow
Merge 7fa0c1b6f into a7cf0b823
Pull Request #121: Bump ruff from 0.13.3 to 0.14.0

166 of 166 branches covered (100.0%)

1383 of 1397 relevant lines covered (99.0%)

11.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.71
modernrpc/xmlrpc/backends/xmltodict.py
1
# PEP 585: use of list[Any] instead of List[Any] is available since Python 3.9, enable it for older versions
2
# PEP 604: use of typeA | typeB is available since Python 3.10, enable it for older versions
3
from __future__ import annotations
13✔
4

5
import base64
13✔
6
import xml.parsers.expat
13✔
7
from collections import OrderedDict
13✔
8
from datetime import datetime
13✔
9
from functools import cached_property
13✔
10
from io import StringIO
13✔
11
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence
13✔
12

13
import defusedxml.ElementTree
13✔
14
import xmltodict
13✔
15
from django.utils.module_loading import import_string
13✔
16

17
from modernrpc.exceptions import RPCInsecureRequest, RPCInvalidRequest, RPCMarshallingError, RPCParseError
13✔
18
from modernrpc.helpers import ensure_sequence, first
13✔
19
from modernrpc.types import CustomKwargs, DictStrAny, RpcErrorResult
13✔
20
from modernrpc.xmlrpc.backends.constants import MAXINT, MININT
13✔
21
from modernrpc.xmlrpc.handler import XmlRpcRequest
13✔
22

23
try:
13✔
24
    # types.NoneType is available only with Python 3.10+
25
    from types import NoneType
13✔
26
except ImportError:
1✔
27
    NoneType = type(None)  # type: ignore[misc]
1✔
28

29

30
if TYPE_CHECKING:
31
    from modernrpc.xmlrpc.handler import XmlRpcResult
32

33

34
LoadFuncType = Callable[[Any], Any]
13✔
35
DumpFuncType = Callable[[Any], dict]
13✔
36

37

38
class Unmarshaller:
13✔
39
    def __init__(self) -> None:
13✔
40
        self.load_funcs: dict[str, LoadFuncType] = {
13✔
41
            "value": self.load_value,
42
            "nil": self.load_nil,
43
            "boolean": self.load_bool,
44
            "int": self.load_int,
45
            "i4": self.load_int,
46
            "double": self.load_float,
47
            "string": self.load_str,
48
            "dateTime.iso8601": self.load_datetime,
49
            "base64": self.load_base64,
50
            "array": self.load_array,
51
            "struct": self.load_struct,
52
        }
53

54
    def dict_to_request(self, data: dict[str, Any]) -> XmlRpcRequest:
13✔
55
        try:
13✔
56
            method_call = data["methodCall"]
13✔
57
        except KeyError as exc:
13✔
58
            raise RPCInvalidRequest("missing methodCall tag", data=data) from exc
13✔
59

60
        try:
13✔
61
            method_name = method_call["methodName"]
13✔
62
        except KeyError as exc:
13✔
63
            raise RPCInvalidRequest("missing methodCall.methodName tag", data=data) from exc
13✔
64

65
        params = method_call.get("params") or {}
13✔
66
        param_list: Sequence[DictStrAny] = params.get("param", [])
13✔
67

68
        args: list[Any] = []
13✔
69
        if len(param_list) == 0:
13✔
70
            args = []
13✔
71
        elif len(param_list) == 1:
13✔
72
            param_list = ensure_sequence(param_list)
13✔
73

74
        for param in param_list:
13✔
75
            _type, v = first(param.items())
13✔
76
            args.append(self.dispatch(_type, v))
13✔
77

78
        return XmlRpcRequest(method_name=method_name, args=args)
13✔
79

80
    def dispatch(self, _type: str, value: Any) -> Any:
13✔
81
        try:
13✔
82
            load_func = self.load_funcs[_type]
13✔
83
        except KeyError as exc:
13✔
84
            raise RPCInvalidRequest(f"Unsupported type {_type}") from exc
13✔
85

86
        return load_func(value)
13✔
87

88
    def load_value(self, data: dict) -> Any:
13✔
89
        _type, v = first(data.items())
13✔
90
        return self.dispatch(_type, v)
13✔
91

92
    @staticmethod
13✔
93
    def load_nil(_) -> None:
13✔
94
        return None
13✔
95

96
    @staticmethod
13✔
97
    def load_int(data: str) -> int:
13✔
98
        return int(data)
13✔
99

100
    @staticmethod
13✔
101
    def load_bool(data: str) -> bool:
13✔
102
        if data not in ("0", "1"):
13✔
103
            raise TypeError("Invalid boolean value: only 0 and 1 are allowed")
13✔
104
        return data == "1"
13✔
105

106
    @staticmethod
13✔
107
    def load_float(data: str) -> float:
13✔
108
        return float(data)
13✔
109

110
    @staticmethod
13✔
111
    def load_str(data: str) -> str:
13✔
112
        return str(data)
13✔
113

114
    @staticmethod
13✔
115
    def load_datetime(data: str) -> datetime:
13✔
116
        return datetime.strptime(data, "%Y%m%dT%H:%M:%S")
13✔
117

118
    @staticmethod
13✔
119
    def load_base64(data: str) -> bytes:
13✔
120
        return base64.b64decode(data)
13✔
121

122
    def load_array(self, data: dict[str, dict[str, list[DictStrAny]]]):
13✔
123
        entries = []
13✔
124
        for element in ensure_sequence(data["data"]["value"]):
13✔
125
            _type, value = first(element.items())
13✔
126
            entries.append(self.dispatch(_type, value))
13✔
127
        return entries
13✔
128

129
    def load_struct(self, data: dict):
13✔
130
        members = ensure_sequence(data["member"])
13✔
131
        res = {}
13✔
132
        for member in members:
13✔
133
            value = member["value"]
13✔
134
            if len(value) > 1:
13✔
135
                raise ValueError
×
136
            _type, value = first(value.items())
13✔
137
            res[member["name"]] = self.dispatch(_type, value)
13✔
138
        return res
13✔
139

140

141
class Marshaller:
13✔
142
    def __init__(self, allow_none=True) -> None:
13✔
143
        self.allow_none = allow_none
13✔
144

145
        self.dump_funcs: dict[type, DumpFuncType] = {
13✔
146
            NoneType: self.dump_nil,
147
            bool: self.dump_bool,
148
            int: self.dump_int,
149
            float: self.dump_float,
150
            str: self.dump_str,
151
            bytes: self.dump_bytearray,
152
            bytearray: self.dump_bytearray,
153
            datetime: self.dump_datetime,
154
            list: self.dump_list,
155
            tuple: self.dump_list,
156
            dict: self.dump_dict,
157
            OrderedDict: self.dump_dict,
158
        }
159

160
    def result_to_dict(self, result: XmlRpcResult) -> DictStrAny:
13✔
161
        if isinstance(result, RpcErrorResult):
13✔
162
            return {
13✔
163
                "methodResponse": {
164
                    "fault": {
165
                        "value": self.dispatch(
166
                            {
167
                                "faultCode": result.code,
168
                                "faultString": result.message,
169
                            }
170
                        ),
171
                    }
172
                }
173
            }
174

175
        return {
13✔
176
            "methodResponse": {
177
                "params": [
178
                    {"param": {"value": self.dispatch(result.data)}},
179
                ]
180
            }
181
        }
182

183
    def dispatch(self, value: Any) -> DictStrAny:
13✔
184
        try:
13✔
185
            dump_func = self.dump_funcs[type(value)]
13✔
186
        except KeyError as e:
13✔
187
            raise TypeError(f"Unsupported type: {type(value)}") from e
13✔
188
        return dump_func(value)
13✔
189

190
    def dump_nil(self, _) -> dict[Literal["nil"], None]:
13✔
191
        if self.allow_none:
13✔
192
            return {"nil": None}
13✔
193
        raise ValueError("cannot marshal None unless allow_none is enabled")
×
194

195
    @staticmethod
13✔
196
    def dump_bool(value: bool) -> dict[str, Literal[1, 0]]:
13✔
197
        return {"boolean": 1 if value else 0}
13✔
198

199
    @staticmethod
13✔
200
    def dump_int(value: int) -> dict[Literal["int"], int]:
13✔
201
        if value > MAXINT or value < MININT:
13✔
202
            raise OverflowError("int value exceeds XML-RPC limits")
13✔
203
        return {"int": value}
13✔
204

205
    @staticmethod
13✔
206
    def dump_float(value: float) -> dict[Literal["double"], float]:
13✔
207
        return {"double": value}
13✔
208

209
    @staticmethod
13✔
210
    def dump_str(value: str) -> dict[Literal["string"], str]:
13✔
211
        return {"string": value}
13✔
212

213
    @staticmethod
13✔
214
    def dump_datetime(value: datetime) -> dict[Literal["dateTime.iso8601"], str]:
13✔
215
        return {"dateTime.iso8601": value.strftime("%04Y%02m%02dT%H:%M:%S")}
13✔
216

217
    @staticmethod
13✔
218
    def dump_bytearray(value: bytes | bytearray) -> dict[Literal["base64"], str]:
13✔
219
        return {"base64": base64.b64encode(value).decode()}
13✔
220

221
    def dump_dict(self, value: dict) -> dict[Literal["struct"], dict[Literal["member"], list[DictStrAny]]]:
13✔
222
        return {
13✔
223
            "struct": {
224
                "member": [
225
                    {
226
                        "name": key,
227
                        "value": self.dispatch(val),
228
                    }
229
                    for key, val in value.items()
230
                ],
231
            },
232
        }
233

234
    def dump_list(
13✔
235
        self, value: list | tuple
236
    ) -> dict[Literal["array"], dict[Literal["data"], dict[Literal["value"], list[Any]]]]:
237
        return {
13✔
238
            "array": {
239
                "data": {
240
                    "value": [self.dispatch(val) for val in value],
241
                }
242
            },
243
        }
244

245

246
class XmlToDictDeserializer:
13✔
247
    """xml-rpc deserializer based on the third-party xmltodict library"""
248

249
    def __init__(
13✔
250
        self,
251
        unmarshaller_klass="modernrpc.xmlrpc.backends.xmltodict.Unmarshaller",
252
        unmarshaller_kwargs: CustomKwargs = None,
253
        load_kwargs: CustomKwargs = None,
254
    ):
255
        self.unmarshaller_klass = import_string(unmarshaller_klass)
13✔
256
        self.unmarshaller_kwargs = unmarshaller_kwargs or {}
13✔
257
        self.load_kwargs = load_kwargs or {}
13✔
258

259
    @cached_property
13✔
260
    def unmarshaller(self):
13✔
261
        return self.unmarshaller_klass(**self.unmarshaller_kwargs)
13✔
262

263
    def loads(self, data: str) -> XmlRpcRequest:
13✔
264
        try:
13✔
265
            # First, parse the XML using defusedxml.ElementTree for security
266
            # This will raise appropriate exceptions for XML vulnerabilities
267
            root = defusedxml.ElementTree.fromstring(data)
13✔
268

269
            # Convert the parsed XML to a string and then parse it with xmltodict
270
            # This is safe because defusedxml has already validated the XML
271
            data = defusedxml.ElementTree.tostring(root, encoding="utf-8").decode("utf-8")
13✔
272
        except defusedxml.ElementTree.ParseError as exc:
13✔
273
            raise RPCParseError(str(exc)) from exc
13✔
274
        except defusedxml.DefusedXmlException as exc:
13✔
275
            raise RPCInsecureRequest(str(exc)) from exc
13✔
276

277
        try:
13✔
278
            structured_data: dict[str, Any] = xmltodict.parse(data, **self.load_kwargs)
13✔
279
        except xml.parsers.expat.ExpatError as exc:
×
280
            raise RPCParseError(str(exc)) from exc
×
281

282
        try:
13✔
283
            return self.unmarshaller.dict_to_request(structured_data)
13✔
284
        except TypeError as exc:
13✔
285
            raise RPCInvalidRequest(str(exc)) from exc
13✔
286

287

288
class XmlToDictSerializer:
13✔
289
    """xml-rpc serializer based on the third-party xmltodict library"""
290

291
    def __init__(
13✔
292
        self,
293
        marshaller_klass="modernrpc.xmlrpc.backends.xmltodict.Marshaller",
294
        marshaller_kwargs: CustomKwargs = None,
295
        dump_kwargs: CustomKwargs = None,
296
    ):
297
        self.marshaller_klass = import_string(marshaller_klass)
13✔
298
        self.marshaller_kwargs = marshaller_kwargs or {}
13✔
299
        self.dump_kwargs = dump_kwargs or {}
13✔
300

301
    @cached_property
13✔
302
    def marshaller(self):
13✔
303
        return self.marshaller_klass(**self.marshaller_kwargs)
13✔
304

305
    def dumps(self, result: XmlRpcResult) -> str:
13✔
306
        try:
13✔
307
            structured_data = self.marshaller.result_to_dict(result)
13✔
308
        except Exception as e:
13✔
309
            raise RPCMarshallingError(result.data, e) from e
13✔
310

311
        output_stream = StringIO()
13✔
312
        xmltodict.unparse(structured_data, output=output_stream, **self.dump_kwargs)
13✔
313
        return output_stream.getvalue()
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc