• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alorence / django-modern-rpc / 19383791482

10 Nov 2025 12:28PM UTC coverage: 99.018% (+0.1%) from 98.878%
19383791482

push

github

alorence
typo

178 of 179 branches covered (99.44%)

1412 of 1426 relevant lines covered (99.02%)

29.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.53
modernrpc/xmlrpc/backends/marshalling.py
1
from __future__ import annotations
30✔
2

3
import base64
30✔
4
from collections import OrderedDict
30✔
5
from datetime import datetime
30✔
6
from typing import Any, Callable, Generic, Iterable, Protocol, TypeVar
30✔
7

8
from modernrpc.exceptions import RPCInvalidRequest
30✔
9
from modernrpc.helpers import first
30✔
10
from modernrpc.types import DictStrAny, RpcErrorResult
30✔
11
from modernrpc.xmlrpc.backends.constants import MAXINT, MININT
30✔
12
from modernrpc.xmlrpc.handler import XmlRpcRequest, XmlRpcResult
30✔
13

14
# Self is available in typing base module only from Python 3.11
15
try:
30✔
16
    from typing import Self
30✔
17
except ImportError:
15✔
18
    from typing_extensions import Self
15✔
19

20
try:
30✔
21
    # types.NoneType is available only with Python 3.10+
22
    from types import NoneType
30✔
23
except ImportError:
8✔
24
    NoneType = type(None)  # type: ignore[misc]
8✔
25

26

27
class ElementTypeProtocol(Protocol, Iterable):
30✔
28
    """
29
    Base protocol for XML element types. This reflects the API of both the xml.etree and the lxml library.
30
    Unfortunately, since both libraries share the same interface without inheriting a common base class, we
31
    have to define our own.
32
    """
33

34
    tag: str
30✔
35
    text: str
30✔
36

37
    def find(self, path: str, namespaces=None) -> Self | None: ...
4✔
38
    def findall(self, path: str, namespaces=None) -> list[Self]: ...
4✔
39
    def append(self, sub_element: Self) -> None: ...
4✔
40

41

42
ElementType = TypeVar("ElementType", bound=ElementTypeProtocol)
30✔
43

44
LoadFuncType = Callable[[ElementType], Any]
30✔
45
DumpFuncType = Callable[[Any], ElementType]
30✔
46

47

48
class EtreeElementUnmarshaller(Generic[ElementType]):
30✔
49
    def __init__(self, allow_none=True) -> None:
30✔
50
        self.allow_none = allow_none
30✔
51

52
        self.load_funcs: dict[str, LoadFuncType] = {
30✔
53
            "value": self.load_value,
54
            "nil": self.load_nil,
55
            "boolean": self.load_bool,
56
            "int": self.load_int,
57
            "i4": self.load_int,
58
            "double": self.load_float,
59
            "string": self.load_str,
60
            "dateTime.iso8601": self.load_datetime,
61
            "base64": self.load_base64,
62
            "array": self.load_array,
63
            "struct": self.load_struct,
64
        }
65

66
    def element_to_request(self, root: ElementType) -> XmlRpcRequest:
30✔
67
        if root.tag != "methodCall":
30✔
68
            raise RPCInvalidRequest("missing methodCall tag", data=root)
30✔
69

70
        method_name = root.find("./methodName")
30✔
71
        if method_name is None:
30✔
72
            raise RPCInvalidRequest("missing methodCall.methodName tag", data=root)
30✔
73

74
        params = root.find("./params")
30✔
75
        if params is None:
30✔
76
            return XmlRpcRequest(method_name=self.stripped_text(method_name))
30✔
77

78
        param_list = params.findall("./param")
30✔
79

80
        args = [self.dispatch(self.first_child(param)) for param in param_list]
30✔
81
        return XmlRpcRequest(method_name=self.stripped_text(method_name), args=args)
30✔
82

83
    @staticmethod
30✔
84
    def stripped_text(elt: ElementType) -> str:
30✔
85
        return elt.text.strip() if elt.text else ""
30✔
86

87
    @staticmethod
30✔
88
    def first_child(elt: ElementType) -> ElementType:
30✔
89
        try:
30✔
90
            return first(elt)
30✔
91
        except IndexError as ie:
×
92
            raise RPCInvalidRequest("missing child element", data=elt) from ie
×
93

94
    def dispatch(self, elt: ElementType) -> Any:
30✔
95
        try:
30✔
96
            load_func = self.load_funcs[elt.tag]
30✔
97
        except KeyError as exc:
30✔
98
            raise RPCInvalidRequest(f"Unsupported type {elt.tag}") from exc
30✔
99

100
        return load_func(elt)
30✔
101

102
    def load_value(self, element: ElementType) -> Any:
30✔
103
        return self.dispatch(self.first_child(element))
30✔
104

105
    def load_nil(self, _: ElementType) -> None:
30✔
106
        if self.allow_none:
30✔
107
            return
30✔
108
        raise ValueError("cannot marshal None unless allow_none is enabled")
×
109

110
    def load_int(self, elt: ElementType) -> int:
30✔
111
        return int(self.stripped_text(elt))
30✔
112

113
    def load_bool(self, elt: ElementType) -> bool:
30✔
114
        value = self.stripped_text(elt)
30✔
115
        if value not in ("0", "1"):
30✔
116
            raise TypeError(f"Invalid boolean value: only 0 and 1 are allowed, found {value}")
30✔
117
        return value == "1"
30✔
118

119
    def load_float(self, elt: ElementType) -> float:
30✔
120
        return float(self.stripped_text(elt))
30✔
121

122
    def load_str(self, elt: ElementType) -> str:
30✔
123
        return str(self.stripped_text(elt))
30✔
124

125
    def load_datetime(self, elt: ElementType) -> datetime:
30✔
126
        return datetime.strptime(self.stripped_text(elt), "%Y%m%dT%H:%M:%S")
30✔
127

128
    def load_base64(self, elt: ElementType) -> bytes:
30✔
129
        return base64.b64decode(self.stripped_text(elt))
30✔
130

131
    def load_array(self, elt: ElementType) -> list[Any]:
30✔
132
        return [self.dispatch(value_elt) for value_elt in elt.findall("./data/value")]
30✔
133

134
    def load_struct(self, elt: ElementType) -> DictStrAny:
30✔
135
        member_names_and_values = [self.load_struct_member(member) for member in elt.findall("./member")]
30✔
136
        return dict(member_names_and_values)
30✔
137

138
    def load_struct_member(self, member_elt: ElementType) -> tuple[str, Any]:
30✔
139
        member_name = member_elt.find("./name")
30✔
140
        if member_name is None:
30✔
141
            raise RPCInvalidRequest("missing member.name tag", data=member_elt)
×
142
        value = member_elt.find("./value")
30✔
143
        if value is None:
30✔
144
            raise RPCInvalidRequest("missing member.value tag", data=member_elt)
×
145

146
        return self.stripped_text(member_name), self.dispatch(value)
30✔
147

148

149
class EtreeElementMarshaller(Generic[ElementType]):
30✔
150
    def __init__(
30✔
151
        self,
152
        element_factory: Callable[[str], ElementType],
153
        sub_element_factory: Callable[[ElementType, str], ElementType],
154
        allow_none=True,
155
    ) -> None:
156
        self.element_factory = element_factory
30✔
157
        self.sub_element_factory = sub_element_factory
30✔
158

159
        self.allow_none = allow_none
30✔
160

161
        self.dump_funcs: dict[type, DumpFuncType] = {
30✔
162
            NoneType: self.dump_nil,
163
            bool: self.dump_bool,
164
            int: self.dump_int,
165
            float: self.dump_float,
166
            str: self.dump_str,
167
            bytes: self.dump_bytearray,
168
            bytearray: self.dump_bytearray,
169
            datetime: self.dump_datetime,
170
            list: self.dump_list,
171
            tuple: self.dump_list,
172
            dict: self.dump_dict,
173
            OrderedDict: self.dump_dict,
174
        }
175

176
    def result_to_element(self, result: XmlRpcResult) -> ElementType:
30✔
177
        """Convert an XmlRpcResult to an XML element."""
178
        root = self.element_factory("methodResponse")
30✔
179

180
        if isinstance(result, RpcErrorResult):
30✔
181
            fault = self.sub_element_factory(root, "fault")
30✔
182
            value = self.sub_element_factory(fault, "value")
30✔
183

184
            struct = self.sub_element_factory(value, "struct")
30✔
185

186
            # Add faultCode member
187
            member = self.sub_element_factory(struct, "member")
30✔
188
            name = self.sub_element_factory(member, "name")
30✔
189
            name.text = "faultCode"
30✔
190
            value = self.sub_element_factory(member, "value")
30✔
191
            int_el = self.sub_element_factory(value, "int")
30✔
192
            int_el.text = str(result.code)
30✔
193

194
            # Add faultString member
195
            member = self.sub_element_factory(struct, "member")
30✔
196
            name = self.sub_element_factory(member, "name")
30✔
197
            name.text = "faultString"
30✔
198
            value = self.sub_element_factory(member, "value")
30✔
199
            string = self.sub_element_factory(value, "string")
30✔
200
            string.text = result.message
30✔
201
        else:
202
            params = self.sub_element_factory(root, "params")
30✔
203
            param = self.sub_element_factory(params, "param")
30✔
204
            value = self.sub_element_factory(param, "value")
30✔
205

206
            # Add the result data
207
            value.append(self.dispatch(result.data))
30✔
208

209
        return root
30✔
210

211
    def dispatch(self, value: Any) -> ElementType:
30✔
212
        """Dispatch a value to the appropriate dump method."""
213
        try:
30✔
214
            dump_func = self.dump_funcs[type(value)]
30✔
215
        except KeyError as exc:
30✔
216
            raise TypeError(f"Unsupported type: {type(value)}") from exc
30✔
217

218
        return dump_func(value)
30✔
219

220
    def dump_nil(self, _: None) -> ElementType:
30✔
221
        if self.allow_none:
30✔
222
            return self.element_factory("nil")
30✔
223
        raise ValueError("cannot marshal None unless allow_none is enabled")
×
224

225
    def dump_bool(self, value: bool) -> ElementType:
30✔
226
        boolean = self.element_factory("boolean")
30✔
227
        boolean.text = "1" if value else "0"
30✔
228
        return boolean
30✔
229

230
    def dump_int(self, value: int) -> ElementType:
30✔
231
        if value > MAXINT or value < MININT:
30✔
232
            raise OverflowError("int value exceeds XML-RPC limits")
30✔
233
        int_el = self.element_factory("int")
30✔
234
        int_el.text = str(value)
30✔
235
        return int_el
30✔
236

237
    def dump_float(self, value: float) -> ElementType:
30✔
238
        double = self.element_factory("double")
30✔
239
        double.text = str(value)
30✔
240
        return double
30✔
241

242
    def dump_str(self, value: str) -> ElementType:
30✔
243
        string = self.element_factory("string")
30✔
244
        string.text = value
30✔
245
        return string
30✔
246

247
    def dump_datetime(self, value: datetime) -> ElementType:
30✔
248
        dt = self.element_factory("dateTime.iso8601")
30✔
249
        dt.text = value.strftime("%04Y%02m%02dT%H:%M:%S")
30✔
250
        return dt
30✔
251

252
    def dump_bytearray(self, value: bytes | bytearray) -> ElementType:
30✔
253
        b64 = self.element_factory("base64")
30✔
254
        b64.text = base64.b64encode(value).decode()
30✔
255
        return b64
30✔
256

257
    def dump_dict(self, value: dict) -> ElementType:
30✔
258
        struct = self.element_factory("struct")
30✔
259
        for key, val in value.items():
30✔
260
            member = self.sub_element_factory(struct, "member")
30✔
261
            name = self.sub_element_factory(member, "name")
30✔
262
            name.text = str(key)
30✔
263
            value_el = self.sub_element_factory(member, "value")
30✔
264
            value_el.append(self.dispatch(val))
30✔
265
        return struct
30✔
266

267
    def dump_list(self, value: list | tuple) -> ElementType:
30✔
268
        array = self.element_factory("array")
30✔
269
        data = self.sub_element_factory(array, "data")
30✔
270
        for val in value:
30✔
271
            value_el = self.sub_element_factory(data, "value")
30✔
272
            value_el.append(self.dispatch(val))
30✔
273
        return array
30✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc