• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 19204199042

09 Nov 2025 05:52AM UTC coverage: 94.044% (-0.02%) from 94.065%
19204199042

Pull #1073

github

dchhabda
fix tests
Pull Request #1073: deprecate old boto. use boto3 only

94 of 107 new or added lines in 5 files covered. (87.85%)

3 existing lines in 2 files now uncovered.

17890 of 19023 relevant lines covered (94.04%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.38
/pybossa/cloud_store_api/base_conn.py
1
import io
1✔
2
import logging
1✔
3
import zlib
1✔
4
from abc import ABC, abstractmethod
1✔
5

6
from boto3.s3.transfer import TransferConfig
1✔
7
from botocore.exceptions import ClientError
1✔
8

9
logger = logging.getLogger(__name__)
1✔
10

11

12
class BaseConnection(ABC):
1✔
13
    @abstractmethod
1✔
14
    def __init__(
1✔
15
        self,
16
    ):
17
        self.client = None
1✔
18

19
    def get_key(self, bucket, path, **kwargs):
1✔
20
        try:
1✔
21
            fobj = self.client.get_object(
1✔
22
                Bucket=bucket,
23
                Key=path,
24
                **kwargs,
25
            )
26
            return fobj
1✔
27
        except ClientError as e:
×
28
            if "Error" in e.response:
×
29
                err_resp = e.response["Error"]
×
30
                http_status = e.response.get("ResponseMetadata", {}).get(
×
31
                    "HTTPStatusCode"
32
                )
33
                logger.warning(
×
34
                    "%s: %s, key %s. http status %d",
35
                    self.__class__.__name__,
36
                    str(e),
37
                    err_resp.get("Key", path),
38
                    http_status,
39
                )
40
            raise
×
41

42
    def get_contents(self, bucket, path, **kwargs):
1✔
43
        return self.get_contents_as_string(
×
44
            bucket=bucket, path=path, encoding=None, kwargs=kwargs
45
        )
46

47
    def get_head(self, bucket, path, **kwargs):
1✔
48
        return self.client.head_object(Bucket=bucket, Key=path, **kwargs)
1✔
49

50
    def get_contents_as_string(self, bucket, path, encoding="utf-8", **kwargs):
1✔
51
        """Returns contents as bytes or a string, depending on parameter
52
        "encoding". If encoding is None, returns bytes, otherwise, returns
53
        a string
54
        """
55
        try:
1✔
56
            fobj = self.client.get_object(
1✔
57
                Bucket=bucket,
58
                Key=path,
59
                **kwargs,
60
            )
61
            content = fobj["Body"].read()
1✔
62

63
            if encoding is None:
1✔
64
                return content
1✔
65

66
            if fobj.get("ContentEncoding") == "gzip":
×
67
                decompress_bits = (
×
68
                    32 + 15
69
                )  # https://docs.python.org/3/library/zlib.html#zlib.decompress
70
                content = zlib.decompress(content, decompress_bits)
×
71

72
            return content.decode(encoding)
×
73
        except ClientError as e:
×
74
            if "Error" in e.response:
×
75
                err_resp = e.response["Error"]
×
76
                http_status = e.response.get("ResponseMetadata", {}).get(
×
77
                    "HTTPStatusCode"
78
                )
79
                logger.warning(
×
80
                    "%s: %s, key %s. http status %d",
81
                    self.__class__.__name__,
82
                    str(e),
83
                    err_resp.get("Key", path),
84
                    http_status,
85
                )
86
            raise
×
87

88
    def set_contents(self, bucket, path, content, **kwargs):
1✔
89
        if type(content) == str:
1✔
90
            content = content.encode()
1✔
91
        try:
1✔
92
            source = io.BytesIO(content)
1✔
93
            config = TransferConfig(multipart_threshold=float("inf"))
1✔
94
            self.client.upload_fileobj(
1✔
95
                source, bucket, path, Config=config, ExtraArgs=kwargs
96
            )
97
        except ClientError as e:
×
98
            if "Error" in e.response:
×
99
                err_resp = e.response["Error"]
×
100
                http_status = e.response.get("ResponseMetadata", {}).get(
×
101
                    "HTTPStatusCode"
102
                )
103
                logger.warning(
×
104
                    "%s: %s, key %s. http status %d",
105
                    self.__class__.__name__,
106
                    str(e),
107
                    err_resp.get("Key", path),
108
                    http_status,
109
                )
110
            raise
×
111

112
    def set_contents_from_file(self, source_file, bucket, path, **kwargs):
1✔
113
        try:
1✔
114
            headers = kwargs.get("ExtraArgs", {}).get("headers", {})
1✔
115
            content_type = headers.get("Content-Type")
1✔
116
            extra_args = {"ContentType": content_type} if content_type else {}
1✔
117
            self.client.upload_fileobj(
1✔
118
                source_file, bucket, path, ExtraArgs=extra_args
119
            )
120
        except ClientError as e:
×
121
            if "Error" in e.response:
×
122
                err_resp = e.response["Error"]
×
123
                http_status = e.response.get("ResponseMetadata", {}).get(
×
124
                    "HTTPStatusCode"
125
                )
126
                logger.warning(
×
127
                    "%s: %s, key %s. http status %d",
128
                    self.__class__.__name__,
129
                    str(e),
130
                    err_resp.get("Key", path),
131
                    http_status,
132
                )
133
            raise
×
134
    def delete_key(self, bucket, path, **kwargs):
1✔
135
        try:
1✔
136
            self.client.delete_object(
1✔
137
                Bucket=bucket,
138
                Key=path,
139
                **kwargs,
140
            )
141
        except ClientError as e:
1✔
142
            if "Error" in e.response:
1✔
143
                err_resp = e.response["Error"]
1✔
144
                http_status = e.response.get("ResponseMetadata", {}).get(
1✔
145
                    "HTTPStatusCode"
146
                )
147
                logger.warning(
1✔
148
                    "%s: %s, key %s. http status %d",
149
                    self.__class__.__name__,
150
                    str(e),
151
                    err_resp.get("Key", path),
152
                    http_status,
153
                )
154
            raise
1✔
155

156
    def generate_url(self, bucket: str, key: str, **kwargs) -> str:
1✔
UNCOV
157
        return self.client.generate_presigned_url(
×
158
            "get_object", Params={"Bucket": bucket, "Key": key}, **kwargs
159
        )
160

161
    def get_bucket(
1✔
162
        self, bucket_name, validate=False, **kwargs
163
    ):  # pylint: disable=W0613
UNCOV
164
        return BaseClientBucketAdapter(self, bucket_name)
×
165

166
    def new_key(self, bucket, path):
1✔
167
        try:
1✔
168
            self.client.put_object(Bucket=bucket, Key=path)
1✔
169
        except ClientError as e:
×
170
            if "Error" in e.response:
×
171
                err_resp = e.response["Error"]
×
172
                http_status = e.response.get("ResponseMetadata", {}).get(
×
173
                    "HTTPStatusCode"
174
                )
175
                logger.warning(
×
176
                    "%s: %s, key %s. http status %d",
177
                    self.__class__.__name__,
178
                    str(e),
179
                    err_resp.get("Key", path),
180
                    http_status,
181
                )
182
            raise
×
183

184
    def copy_key(self, bucket, source_key, target_key, **kwargs):
1✔
185
        try:
1✔
186
            copy_source = {"Bucket": bucket, "Key": source_key}
1✔
187
            self.client.copy(CopySource=copy_source, Bucket=bucket, Key=target_key, ExtraArgs=kwargs)
1✔
188
        except ClientError as e:
×
189
            if "Error" in e.response:
×
190
                err_resp = e.response["Error"]
×
191
                http_status = e.response.get("ResponseMetadata", {}).get(
×
192
                    "HTTPStatusCode"
193
                )
194
                logger.warning(
×
195
                    "%s: %s, key %s. http status %d",
196
                    self.__class__.__name__,
197
                    str(e),
198
                    err_resp.get("Key", path),
199
                    http_status,
200
                )
201
            raise
×
202

203

204
class BaseClientBucketAdapter:
1✔
205
    def __init__(self, base_client, bucket_name):
1✔
206
        self.connection = base_client
1✔
207
        self.name = bucket_name
1✔
208

209
    def get_key(self, key_name, *args, **kwargs):  # pylint: disable=W0613
1✔
210
        response = self.connection.get_key(self.name, key_name)
1✔
211
        return BaseClientKeyAdapter(self.connection, self.name, key_name, **response)
1✔
212

213
    def delete_key(self, key_name, **kwargs):
1✔
214
        kwargs["VersionId"] = kwargs.pop("version_id", None)
1✔
215
        self.connection.delete_key(bucket=self.name, path=key_name)
1✔
216

217
    def new_key(self, key_name, *args, **kwargs):  # pylint: disable=W0613
1✔
218
        self.connection.new_key(bucket=self.name, path=key_name, **kwargs)
1✔
219
        return BaseClientKeyAdapter(self.connection, self.name, key_name)
1✔
220

221
    def copy_key(self, source_key, target_key, **kwargs):
1✔
222
        # Handle both string key names and Key objects
223
        source_key_name = source_key.name if hasattr(source_key, 'name') else source_key
1✔
224
        target_key_name = target_key.name if hasattr(target_key, 'name') else target_key
1✔
225
        self.connection.copy_key(self.name, source_key_name, target_key_name, **kwargs)
1✔
226

227

228
class BaseClientKeyAdapter:
1✔
229
    def __init__(self, base_client, bucket_name, key_name, **kwargs):
1✔
230
        self.base_client = base_client
1✔
231
        self.bucket = bucket_name
1✔
232
        self.name = key_name
1✔
233
        self.version_id = kwargs.get("VersionId")
1✔
234
        self.content_type = kwargs.get("ContentType")
1✔
235
        self.content_encoding = kwargs.get("ContentEncoding")
1✔
236
        self.content_language = kwargs.get("ContentLanguage")
1✔
237

238
    def get_contents_as_string(self, encoding=None, **kwargs):  # pylint: disable=W0613
1✔
239
        """Returns contents as bytes or string, depending on encoding parameter.
240
        If encoding is None, returns bytes, otherwise, returns
241
        a string.
242

243
        parameter "encoding" is default to None. This is consistent with boto2
244
        get_contents_as_string() method:
245
        http://boto.cloudhackers.com/en/latest/ref/s3.html#boto.s3.key.Key.get_contents_as_string
246
        """
247
        return self.base_client.get_contents_as_string(
1✔
248
            bucket=self.bucket, path=self.name, encoding=encoding
249
        )
250

251
    def set_contents_from_string(self, content, **kwargs):
1✔
252
        self.base_client.set_contents(
1✔
253
            bucket=self.bucket, path=self.name, content=content, **kwargs
254
        )
255

256
    def set_contents_from_file(self, source_file, **kwargs):
1✔
257
        return self.base_client.set_contents_from_file(
1✔
258
            source_file, bucket=self.bucket, path=self.name, ExtraArgs=kwargs
259
        )
260

261
    def get_object_head(self):
1✔
262
        return self.base_client.get_head(self.bucket, self.name)
1✔
263

264
    def generate_url(self, expire=0, query_auth=True):  # pylint: disable=W0613
1✔
265
        return self.base_client.generate_url(
1✔
266
            bucket=self.bucket, key=self.name, ExpiresIn=expire
267
        )
268

269
    def delete(self, **kwargs):
1✔
270
        self.base_client.delete_key(bucket=self.bucket, path=self.name, **kwargs)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc