• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.74
/daliuge-engine/dlg/data/drops/s3_drop.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
"""
2✔
23
Drops that interact with S3
24
"""
25
from asyncio.log import logger
2✔
26
from io import BytesIO
2✔
27
from typing import Tuple
2✔
28

29
from overrides import overrides
2✔
30

31
try:
2✔
32
    import boto3
2✔
33
    import botocore
2✔
34

35
except ImportError:
×
36
    logger.warning("BOTO bindings are not available")
×
37

38
from .data_base import DataDROP
2✔
39
from dlg.data.io import ErrorIO, OpenMode, DataIO
2✔
40
from dlg.meta import (
2✔
41
    dlg_component,
42
    dlg_batch_input,
43
    dlg_batch_output,
44
    dlg_streaming_input,
45
    dlg_string_param,
46
    dlg_list_param,
47
)
48

49
from dlg.named_port_utils import identify_named_ports, check_ports_dict
2✔
50

51

52
##
53
# @brief S3
54
# @details An object available in a bucket on a S3 (Simple Storage Service) object storage platform
55
# @par EAGLE_START
56
# @param category S3
57
# @param tag daliuge
58
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
59
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
60
# @param Bucket Bucket//String/ComponentParameter/readwrite//False/False/The S3 Bucket
61
# @param Key Key//String/ComponentParameter/readwrite//False/False/The S3 object key
62
# @param profile_name Profile Name//String/ComponentParameter/readwrite//False/False/The S3 profile name
63
# @param endpoint_url Endpoint URL//String/ComponentParameter/readwrite//False/False/The URL exposing the S3 REST API
64
# @param dropclass dropclass/dlg.data.drops.s3_drop.S3DROP/String/ComponentParameter/readwrite//False/False/The URL exposing the S3 REST API
65
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
66
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
67
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
68
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
69
# @par EAGLE_END
70
class S3DROP(DataDROP):
2✔
71
    """
72
    A DROP that points to data stored in S3
73
    """
74

75
    component_meta = dlg_component(
2✔
76
        "S3DROP",
77
        "S3 Data Drop",
78
        [dlg_batch_input("binary/*", [])],
79
        [dlg_batch_output("binary/*", [])],
80
        [dlg_streaming_input("binary/*")],
81
    )
82

83
    Bucket = dlg_string_param("Bucket", None)
2✔
84
    Key = dlg_string_param("Key", None)
2✔
85
    storage_class = dlg_string_param("storage_class", "S3")
2✔
86
    tags = dlg_list_param("tags", None)
2✔
87
    # don't change the aws names
88
    aws_access_key_id = dlg_string_param("aws_access_key_id", None)
2✔
89
    aws_secret_access_key = dlg_string_param("aws_secret_access_key", None)
2✔
90
    profile_name = dlg_string_param("profile_name", None)
2✔
91
    endpoint_url = dlg_string_param("endpoint_url", None)
2✔
92

93
    def initialize(self, **kwargs):
2✔
94
        self.keyargs = {
×
95
            "Bucket": self.Bucket,
96
            "Key": self.Key,
97
            "storage_class": self.storage_class,
98
            "tags": self.tags,
99
            "aws_access_key_id": self.aws_access_key_id,
100
            "aws_secret_access_key": self.aws_secret_access_key,
101
            "profile_name": self.profile_name,
102
            "endpoint_url": self.endpoint_url,
103
        }
NEW
104
        logger.debug("S3 initializing: %s", self.keyargs)
×
105
        self.Key = self.uid if not self.Key else self.Key
×
106
        return super().initialize(**kwargs)
×
107

108
    @property
2✔
109
    def path(self) -> str:
2✔
110
        """
111
        Returns the path to the S3 object
112
        :return: the path
113
        """
114
        return "{}/{}".format(self.Bucket, self.Key)
×
115

116
    @property
2✔
117
    def dataURL(self) -> str:
2✔
118
        return "s3://{}/{}".format(self.Bucket, self.Key)
×
119

120
    @property
2✔
121
    def size(self) -> int:
2✔
122
        size = self.getIO()._size()
×
123
        logger.debug(("Size of object: %s", size))
×
124
        return size
×
125

126
    def getIO(self) -> DataIO:
2✔
127
        """
128
        Return
129
        :return:
130
        """
131
        logger.debug("S3DROP producers: %s", self._producers)
×
132
        if check_ports_dict(self._producers):
×
133
            self.mapped_inputs = identify_named_ports(
×
134
                self._producers, {}, self.keyargs, mode="inputs"
135
            )
136
        logger.debug("Parameters found: {}", self.parameters)
×
137
        return S3IO(
×
138
            self.aws_access_key_id,
139
            self.aws_secret_access_key,
140
            self.profile_name,
141
            self.Bucket,
142
            self.Key,
143
            self.endpoint_url,
144
            self._expectedSize,
145
        )
146

147

148
class S3IO(DataIO):
2✔
149
    """
150
    IO class for the S3 Drop
151
    """
152

153
    _desc = None
2✔
154

155
    def __init__(
2✔
156
        self,
157
        aws_access_key_id=None,
158
        aws_secret_access_key=None,
159
        profile_name=None,
160
        Bucket=None,
161
        Key=None,
162
        endpoint_url=None,
163
        expectedSize=-1,
164
        **kwargs,
165
    ):
166
        super().__init__(**kwargs)
×
167

168
        logger.debug(
×
169
            (
170
                "key_id: %s; key: %s; profile: %s; bucket: %s; object_id: %s; %s",
171
                aws_access_key_id,
172
                aws_secret_access_key,
173
                profile_name,
174
                Bucket,
175
                Key,
176
                endpoint_url,
177
            )
178
        )
179
        self._s3 = None
×
180
        self._s3_access_key_id = aws_access_key_id
×
181
        self._s3_secret_access_key = aws_secret_access_key
×
182
        self._s3_endpoint_url = endpoint_url
×
183
        self._profile_name = profile_name
×
184
        self._bucket = Bucket
×
185
        self._key = Key
×
186
        self._s3_endpoint_url = endpoint_url
×
187
        self._s3 = self._get_s3_connection()
×
188
        self.url = f"{endpoint_url}/{Bucket}/{Key}"
×
189
        self._expectedSize = expectedSize
×
190
        self._buffer = b""
×
191
        if self._mode == 1:
×
192
            try:
×
193
                self._s3Stream = self._open()
×
194
            except botocore.exceptions.ClientError as e:
×
195
                if not self.exists():
×
196
                    logger.debug("Object does not exist yet. Creating!")
×
197
                    self._mode = 0
×
198

199
    def _get_s3_connection(self):
2✔
200
        s3 = None
×
201
        if self._s3 is None:
×
202
            if self._profile_name is not None or (
×
203
                self._s3_access_key_id is not None
204
                and self._s3_secret_access_key is not None
205
            ):
206
                logger.debug("Opening boto3 session")
×
207
                session = boto3.Session(profile_name=self._profile_name)
×
208
                s3 = session.client(
×
209
                    service_name="s3",
210
                    endpoint_url=self._s3_endpoint_url,
211
                )
212
            else:
213
                s3 = boto3.resource("s3")
×
214
        else:
215
            s3 = self._s3
×
216
        return s3
×
217

218
    def _open(self, **kwargs):
2✔
219
        logger.debug("Opening S3 object %s in mode %s", self._key, self._mode)
×
220
        if self._mode == OpenMode.OPEN_WRITE:
×
221
            exists = self._exists()
×
222
            if exists == (True, True):
×
223
                logger.error("Object exists already. Assuming part upload.")
×
224

225
            elif not exists[0]:
×
226
                # bucket does not exist, create first
227
                try:
×
228
                    self._s3.create_bucket(Bucket=self._bucket)
×
229
                except botocore.exceptions.ClientError as e:
×
230
                    raise e
×
231
            resp = self._s3.create_multipart_upload(
×
232
                Bucket=self._bucket,
233
                Key=self._key,
234
            )
235
            self._uploadId = resp["UploadId"]
×
236
            self._buffer = b""
×
237
            self._written = 0
×
238
            self._partNo = 1
×
239
            self._parts = {"Parts": []}
×
240
            return self._s3
×
241
        else:
242
            s3Object = self._s3.get_object(Bucket=self._bucket, Key=self._key)
×
243
            self._desc = s3Object["Body"]
×
244
        return s3Object["Body"]
×
245

246
    @overrides
2✔
247
    def _read(self, count=-1, **kwargs):
2✔
248
        # Read data from S3 and give it back to our reader
249
        if not self._desc:
×
250
            self._desc = self._open()
×
251
        if count != -1:
×
252
            return self._desc.read(count)
×
253
        else:
254
            return self._desc.read()
×
255

256
    def _writeBuffer2S3(self, write_buffer=b""):
2✔
257
        try:
×
258
            with BytesIO(write_buffer) as f:
×
259
                self._s3.upload_part(
×
260
                    Body=f,
261
                    Bucket=self._bucket,
262
                    Key=self._key,
263
                    UploadId=self._uploadId,
264
                    PartNumber=self._partNo,
265
                )
266
            logger.debug(
×
267
                "Wrote %d bytes part %d to S3: %s",
268
                len(write_buffer),
269
                self._partNo,
270
                self.url,
271
            )
272
            self._partNo += 1
×
273
            self._written += len(write_buffer)
×
274
        except botocore.exceptions.ClientError as e:
×
275
            logger.error("Writing to S3 failed")
×
276
            return -1
×
277

278
    @overrides
2✔
279
    def _write(self, data, **kwargs) -> int:
2✔
280
        """ """
281
        self._buffer += data
×
282
        PART_SIZE = 5 * 1024**2
×
283
        logger.debug("Length of S3 buffer: %d", len(self._buffer))
×
284
        if len(self._buffer) >= PART_SIZE:
×
285
            self._writeBuffer2S3(self._buffer[:PART_SIZE])
×
286
            self._buffer = self._buffer[PART_SIZE:]
×
287
        return len(data)  # we return the length of what we have received
×
288
        # to keep the client happy
289

290
    def _get_object_head(self) -> dict:
2✔
291
        return self._s3.head_object(Bucket=self._bucket, Key=self._key)
×
292

293
    @overrides
2✔
294
    def _size(self, **kwargs) -> int:
2✔
295
        if self.exists():
×
296
            object_head = self._get_object_head()
×
297
            logger.debug(("Size of object:%s", object_head["ContentLength"]))
×
298
            return object_head["ContentLength"]
×
299
        return -1
×
300

301
    @overrides
2✔
302
    def _close(self, **kwargs):
2✔
303
        if self._mode == OpenMode.OPEN_WRITE:
×
304
            if (
×
305
                len(self._buffer) > 0
306
            ):  # write, if there is still something in the buffer
307
                self._writeBuffer2S3(self._buffer)
×
308
            # complete multipart upload and cleanup
309
            res = self._s3.list_parts(
×
310
                Bucket=self._bucket, Key=self._key, UploadId=self._uploadId
311
            )
312
            parts = [
×
313
                {"ETag": p["ETag"], "PartNumber": p["PartNumber"]}
314
                for p in res["Parts"]
315
            ]
316
            # TODO: Check checksum!
317
            res = self._s3.complete_multipart_upload(
×
318
                Bucket=self._bucket,
319
                Key=self._key,
320
                UploadId=self._uploadId,
321
                MultipartUpload={"Parts": parts},
322
            )
323
            del self._buffer
×
324
            logger.info(
×
325
                "Wrote a total of %.1f MB to %s",
326
                self._written / (1024**2),
327
                self.url,
328
            )
329

330
        self._desc.close()
×
331
        del self._s3
×
332

333
    def _exists(self) -> Tuple[bool, bool]:
2✔
334
        """
335
        Need to have a way to get bucket and object existence
336
        seperately for writing.
337
        """
338
        s3 = self._s3
×
339

340
        try:
×
341
            # s3.meta.client.head_bucket(Bucket=self.bucket)
342
            logger.info("Checking existence of bucket: %s", self._bucket)
×
343
            s3.head_bucket(Bucket=self._bucket)
×
344
            logger.info("Bucket: %s exists", self._bucket)
×
345
        except botocore.exceptions.ClientError as e:
×
346
            # If a client error is thrown, then check that it was a 404 error.
347
            # If it was a 404 error, then the bucket does not exist.
348
            error_code = int(e.response["Error"]["Code"])
×
349
            if error_code == 404:
×
350
                logger.info("Bucket: %s does not exist", self._bucket)
×
351
                return False, False
×
352
            elif error_code == 403:
×
353
                logger.info("Access to bucket %s is forbidden", self._bucket)
×
354
                return False, False
×
355
            elif error_code > 300:
×
356
                logger.info(
×
357
                    "Error code %s when accessing bucket %s",
358
                    error_code,
359
                    self._bucket,
360
                )
361
        try:
×
362
            logger.info("Checking existence of object: %s", self._key)
×
363
            s3.head_object(Bucket=self._bucket, Key=self._key)
×
364
            logger.info("Object: %s exists", self._key)
×
365
            return True, True
×
366
        except botocore.exceptions.ClientError as e:
×
367
            # If a client error is thrown, then check that it was a 404 error.
368
            # If it was a 404 error, then the object does not exist.
369
            error_code = int(e.response["Error"]["Code"])
×
370
            if error_code == 404:
×
371
                logger.info("Object: %s does not exist", self._key)
×
372
                return True, False
×
373
            else:
374
                raise ErrorIO()
×
375

376
    @overrides
2✔
377
    def exists(self) -> bool:
2✔
378
        bucket_exists, object_exists = self._exists()
×
379
        if bucket_exists and object_exists:
×
380
            return True
×
381
        else:
382
            return False
×
383

384
    @overrides
2✔
385
    def delete(self):
2✔
386
        if self.exists():
×
387
            self._s3.delete_object(Bucket=self._bucket, Key=self._key)
×
388
            return 0
×
389
        else:
390
            return ErrorIO()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc