• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tcalmant / python-javaobj / 13854142564

14 Mar 2025 09:56AM CUT coverage: 78.494% (-0.2%) from 78.701%
13854142564

push

github

web-flow
Merge pull request #59 from tcalmant/project-overhaul-2

Project overhaul 2

1595 of 2032 relevant lines covered (78.49%)

2.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.49
/javaobj/utils.py
1
#!/usr/bin/python
2
# -- Content-Encoding: utf-8 --
3
"""
3✔
4
Provides utility methods used by the core implementation of javaobj.
5

6
Namely: logging methods, bytes/str/unicode converters
7

8
:authors: Thomas Calmant
9
:license: Apache License 2.0
10
:version: 0.4.4
11
:status: Alpha
12

13
..
14

15
    Copyright 2024 Thomas Calmant
16

17
    Licensed under the Apache License, Version 2.0 (the "License");
18
    you may not use this file except in compliance with the License.
19
    You may obtain a copy of the License at
20

21
        http://www.apache.org/licenses/LICENSE-2.0
22

23
    Unless required by applicable law or agreed to in writing, software
24
    distributed under the License is distributed on an "AS IS" BASIS,
25
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26
    See the License for the specific language governing permissions and
27
    limitations under the License.
28
"""
29

30
from __future__ import absolute_import
3✔
31

32
# Standard library
33
from typing import IO, Tuple  # noqa: F401
3✔
34
import gzip
3✔
35
import logging
3✔
36
import os
3✔
37
import struct
3✔
38
import sys
3✔
39

40
# Modified UTF-8 parser
41
from .modifiedutf8 import byte_to_int, decode_modified_utf8
3✔
42

43
# ------------------------------------------------------------------------------
44

45
# Module version
46
__version_info__ = (0, 4, 4)
3✔
47
__version__ = ".".join(str(x) for x in __version_info__)
3✔
48

49
# Documentation strings format
50
__docformat__ = "restructuredtext en"
3✔
51

52
# ------------------------------------------------------------------------------
53

54
# Setup the logger
55
_log = logging.getLogger("javaobj")
3✔
56

57

58
def log_debug(message, ident=0):
3✔
59
    """
60
    Logs a message at debug level
61

62
    :param message: Message to log
63
    :param ident: Number of indentation spaces
64
    """
65
    _log.debug("%s%s", " " * (ident * 2), message)
3✔
66

67

68
def log_error(message, ident=0):
3✔
69
    """
70
    Logs a message at error level
71

72
    :param message: Message to log
73
    :param ident: Number of indentation spaces
74
    """
75
    _log.error("%s%s", " " * (ident * 2), message)
3✔
76

77

78
# ------------------------------------------------------------------------------
79

80

81
def read_struct(data, fmt_str):
3✔
82
    # type: (bytes, str) -> Tuple
83
    """
84
    Reads input bytes and extract the given structure. Returns both the read
85
    elements and the remaining data
86

87
    :param data: Data as bytes
88
    :param fmt_str: Struct unpack format string
89
    :return: A tuple (results as tuple, remaining data)
90
    """
91
    size = struct.calcsize(fmt_str)
3✔
92
    return struct.unpack(fmt_str, data[:size]), data[size:]
3✔
93

94

95
def read_string(data, length_fmt="H"):
3✔
96
    # type: (bytes, str) -> Tuple[UNICODE_TYPE, bytes]
97
    """
98
    Reads a serialized string
99

100
    :param data: Bytes where to read the string from
101
    :param length_fmt: Structure format of the string length (H or Q)
102
    :return: The deserialized string
103
    """
104
    (length,), data = read_struct(data, ">{0}".format(length_fmt))
3✔
105
    ba, data = data[:length], data[length:]
3✔
106
    return to_unicode(ba), data
3✔
107

108

109
# ------------------------------------------------------------------------------
110

111

112
def java_data_fd(original_df):
3✔
113
    # type: (IO[bytes]) -> IO[bytes]
114
    """
115
    Ensures that the input file descriptor contains a Java serialized content.
116
    Automatically uncompresses GZipped data
117

118
    :param original_df: Input file descriptor
119
    :return: Input file descriptor or a fake one to access uncompressed data
120
    :raise IOError: Error reading input file
121
    """
122
    # Read the first bytes
123
    start_idx = original_df.tell()
3✔
124
    magic_header = [byte_to_int(x) for x in original_df.read(2)]  # type: ignore
3✔
125
    original_df.seek(start_idx, os.SEEK_SET)
3✔
126

127
    if magic_header[0] == 0xAC:
3✔
128
        # Consider we have a raw seralized stream: use it
129
        original_df.seek(start_idx, os.SEEK_SET)
3✔
130
        return original_df
3✔
131
    elif magic_header[0] == 0x1F and magic_header[1] == 0x8B:
3✔
132
        # Open the GZip file
133
        return gzip.GzipFile(fileobj=original_df, mode="rb")  # type: ignore
3✔
134
    else:
135
        # Let the parser raise the error
136
        return original_df
×
137

138

139
# ------------------------------------------------------------------------------
140

141

142
def hexdump(src, start_offset=0, length=16):
3✔
143
    # type: (str, int, int) -> str
144
    """
145
    Prepares an hexadecimal dump string
146

147
    :param src: A string containing binary data
148
    :param start_offset: The start offset of the source
149
    :param length: Length of a dump line
150
    :return: A dump string
151
    """
152
    hex_filter = "".join(
3✔
153
        (len(repr(chr(x))) == 3) and chr(x) or "." for x in range(256)
154
    )
155
    pattern = "{{0:04X}}   {{1:<{0}}}  {{2}}\n".format(length * 3)
3✔
156

157
    # Convert raw data to str (Python 3 compatibility)
158
    src = to_str(src, "latin-1")
3✔
159

160
    result = []
3✔
161
    for i in range(0, len(src), length):
3✔
162
        s = src[i : i + length]
3✔
163
        hexa = " ".join("{0:02X}".format(ord(x)) for x in s)
3✔
164
        printable = s.translate(hex_filter)
3✔
165
        result.append(pattern.format(i + start_offset, hexa, printable))
3✔
166

167
    return "".join(result)
3✔
168

169

170
# ------------------------------------------------------------------------------
171

172

173
if sys.version_info[0] >= 3:
3✔
174
    BYTES_TYPE = bytes  # pylint:disable=C0103
3✔
175
    UNICODE_TYPE = str  # pylint:disable=C0103
3✔
176
    unicode_char = chr  # pylint:disable=C0103
3✔
177

178
    def bytes_char(c):
3✔
179
        """
180
        Converts the given character to a bytes string
181
        """
182
        return bytes((c,))
3✔
183

184
    # Python 3 interpreter : bytes & str
185
    def to_bytes(data, encoding="UTF-8"):
3✔
186
        """
187
        Converts the given string to an array of bytes.
188
        Returns the first parameter if it is already an array of bytes.
189

190
        :param data: A unicode string
191
        :param encoding: The encoding of data
192
        :return: The corresponding array of bytes
193
        """
194
        if type(data) is bytes:  # pylint:disable=C0123
3✔
195
            # Nothing to do
196
            return data
3✔
197
        return data.encode(encoding)
3✔
198

199
    def to_str(data, encoding="UTF-8"):
3✔
200
        """
201
        Converts the given parameter to a string.
202
        Returns the first parameter if it is already an instance of ``str``.
203

204
        :param data: A string
205
        :param encoding: The encoding of data
206
        :return: The corresponding string
207
        """
208
        if type(data) is str:  # pylint:disable=C0123
3✔
209
            # Nothing to do
210
            return data
×
211
        try:
3✔
212
            return str(data, encoding)
3✔
213
        except UnicodeDecodeError:
×
214
            return decode_modified_utf8(data)[0]
×
215

216
    # Same operation
217
    to_unicode = to_str  # pylint:disable=C0103
3✔
218

219
    def read_to_str(data):
3✔
220
        """
221
        Concats all bytes into a string
222
        """
223
        return "".join(chr(char) for char in data)
3✔
224

225

226
else:
227
    BYTES_TYPE = str  # pylint:disable=C0103
×
228
    UNICODE_TYPE = (
×
229
        unicode  # pylint:disable=C0103,undefined-variable  # noqa: F821
230
    )
231
    unicode_char = (
×
232
        unichr  # pylint:disable=C0103,undefined-variable  # noqa: F821
233
    )
234
    bytes_char = chr  # pylint:disable=C0103
×
235

236
    # Python 2 interpreter : str & unicode
237
    def to_str(data, encoding="UTF-8"):
×
238
        """
239
        Converts the given parameter to a string.
240
        Returns the first parameter if it is already an instance of ``str``.
241

242
        :param data: A string
243
        :param encoding: The encoding of data
244
        :return: The corresponding string
245
        """
246
        if type(data) is str:  # pylint:disable=C0123
×
247
            # Nothing to do
248
            return data
×
249
        return data.encode(encoding)
×
250

251
    # Same operation
252
    to_bytes = to_str  # pylint:disable=C0103
×
253

254
    # Python 2 interpreter : str & unicode
255
    def to_unicode(data, encoding="UTF-8"):
×
256
        """
257
        Converts the given parameter to a string.
258
        Returns the first parameter if it is already an instance of ``str``.
259

260
        :param data: A string
261
        :param encoding: The encoding of data
262
        :return: The corresponding string
263
        """
264
        if type(data) is UNICODE_TYPE:  # pylint:disable=C0123
×
265
            # Nothing to do
266
            return data
×
267
        try:
×
268
            return data.decode(encoding)
×
269
        except UnicodeDecodeError:
×
270
            return decode_modified_utf8(data)[0]
×
271

272
    def read_to_str(data):
×
273
        """
274
        Nothing to do in Python 2
275
        """
276
        return data
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc