• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 12310197548

13 Dec 2024 05:32AM UTC coverage: 79.799% (-0.004%) from 79.803%
12310197548

push

github

myxie
Test if changes to discovery tests fail on remote tests

3 of 3 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

15651 of 19613 relevant lines covered (79.8%)

3.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.03
/daliuge-engine/dlg/data/io.py
1
#
2
#    ICRAR - International Centre for Radio Astronomy Research
3
#    (c) UWA - The University of Western Australia, 2015
4
#    Copyright by UWA (in the framework of the ICRAR)
5
#    All rights reserved
6
#
7
#    This library is free software; you can redistribute it and/or
8
#    modify it under the terms of the GNU Lesser General Public
9
#    License as published by the Free Software Foundation; either
10
#    version 2.1 of the License, or (at your option) any later version.
11
#
12
#    This library is distributed in the hope that it will be useful,
13
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
#    Lesser General Public License for more details.
16
#
17
#    You should have received a copy of the GNU Lesser General Public
18
#    License along with this library; if not, write to the Free Software
19
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20
#    MA 02111-1307  USA
21
#
22
from abc import abstractmethod, ABCMeta
4✔
23
from http.client import HTTPConnection
4✔
24
from multiprocessing.sharedctypes import Value
4✔
25
from overrides import overrides
4✔
26
import io
4✔
27
import logging
4✔
28
import os
4✔
29
import sys
4✔
30
import urllib.parse
4✔
31
from abc import abstractmethod, ABCMeta
4✔
32
from typing import Optional, Union
4✔
33

34
from dlg import ngaslite
4✔
35

36
if sys.version_info >= (3, 8):
4✔
37
    from dlg.shared_memory import DlgSharedMemory
4✔
38

39

40
logger = logging.getLogger(__name__)
4✔
41

42

43
class OpenMode:
4✔
44
    """
45
    Open Mode for Data Drops
46
    """
47

48
    OPEN_WRITE, OPEN_READ = range(2)
4✔
49

50

51
class DataIO:
4✔
52
    """
53
    A class used to read/write data stored in a particular kind of storage in an
54
    abstract way. This base class simply declares a number of methods that
55
    deriving classes must actually implement to handle different storage
56
    mechanisms (e.g., a local filesystem or an NGAS server).
57

58
    An instance of this class represents a particular piece of data. Thus at
59
    construction time users must specify a storage-specific unique identifier
60
    for the data that this object handles (e.g., a filename in the case of a
61
    DataIO class that works with local filesystem storage, or a host:port/fileId
62
    combination in the case of a class that works with an NGAS server).
63

64
    Once an instance has been created it can be opened via its `open` method
65
    indicating an open mode. If opened with `OpenMode.OPEN_READ`, only read
66
    operations will be allowed on the instance, and if opened with
67
    `OpenMode.OPEN_WRITE` only writing operations will be allowed.
68
    """
69

70
    __metaclass__ = ABCMeta
4✔
71
    _mode: Optional[OpenMode]
4✔
72

73
    def __init__(self):
4✔
74
        self._mode = None
4✔
75

76
    def open(self, mode: OpenMode, **kwargs):
4✔
77
        """
78
        Opens the underlying storage where the data represented by this instance
79
        is stored. Depending on the value of `mode` subsequent calls to
80
        `self.read` or `self.write` will succeed or fail.
81
        """
82
        self._mode = mode
4✔
83
        self._desc = self._open(**kwargs)
4✔
84

85
    def write(self, data, **kwargs) -> int:
4✔
86
        """
87
        Writes `data` into the storage
88
        """
89
        if self._mode is None:
4✔
90
            raise ValueError("Writing operation attempted on closed DataIO object")
4✔
91
        if self._mode == OpenMode.OPEN_READ:
4✔
92
            raise ValueError("Writing operation attempted on write-only DataIO object")
4✔
93
        return self._write(data, **kwargs)
4✔
94

95
    def read(self, count: int, **kwargs):
4✔
96
        """
97
        Reads `count` bytes from the underlying storage.
98
        """
99
        if self._mode is None:
4✔
100
            raise ValueError("Reading operation attempted on closed DataIO object")
4✔
101
        if self._mode == OpenMode.OPEN_WRITE:
4✔
102
            raise ValueError("Reading operation attempted on write-only DataIO object")
4✔
103
        return self._read(count, **kwargs)
4✔
104

105
    def close(self, **kwargs):
4✔
106
        """
107
        Closes the underlying storage where the data represented by this
108
        instance is stored, freeing underlying resources.
109
        """
110
        if self._mode is None:
4✔
111
            return
4✔
112
        self._close()
4✔
113
        self._mode = None
4✔
114

115
    def size(self, **kwargs) -> int:
4✔
116
        """
117
        Returns the current total size of the underlying stored object. If the
118
        storage class does not support this it is supposed to return -1.
119
        """
120
        return self._size(**kwargs)
×
121

122
    def isOpened(self):
4✔
123
        """
124
        Returns true if the io is currently opened for read or write.
125
        """
126
        return self._mode is not None
×
127

128
    @abstractmethod
4✔
129
    def exists(self) -> bool:
4✔
130
        """
131
        Returns `True` if the data represented by this DataIO exists indeed in
132
        the underlying storage mechanism
133
        """
134

135
    @abstractmethod
4✔
136
    def delete(self):
4✔
137
        """
138
        Deletes the data represented by this DataIO
139
        """
140

141
    def buffer(self) -> Union[memoryview, bytes, bytearray]:
4✔
142
        """
143
        Gets a buffer protocol compatible object of the drop data.
144
        This may be a zero-copy view of the data or a copy depending
145
        on whether the drop stores data in cpu memory or not.
146
        """
147

148
    @abstractmethod
4✔
149
    def _open(self, **kwargs):
4✔
150
        pass
×
151

152
    @abstractmethod
4✔
153
    def _read(self, count, **kwargs):
4✔
154
        pass
×
155

156
    @abstractmethod
4✔
157
    def _write(self, data, **kwargs) -> int:
4✔
158
        pass
×
159

160
    @abstractmethod
4✔
161
    def _close(self, **kwargs):
4✔
162
        pass
×
163

164
    @abstractmethod
4✔
165
    def _size(self, **kwargs) -> int:
4✔
166
        pass
×
167

168

169
class NullIO(DataIO):
4✔
170
    """
171
    A DataIO that stores no data
172
    """
173

174
    def _open(self, **kwargs):
4✔
175
        return None
4✔
176

177
    def _read(self, count=65536, **kwargs):
4✔
178
        return None
4✔
179

180
    def _write(self, data, **kwargs) -> int:
4✔
181
        return len(data)
4✔
182

183
    def _close(self, **kwargs):
4✔
184
        pass
4✔
185

186
    @overrides
4✔
187
    def _size(self, **kwargs) -> int:
4✔
188
        """
189
        Size is always 0 for this storage class
190
        """
191
        return 0
×
192

193
    @overrides
4✔
194
    def exists(self) -> bool:
4✔
195
        return True
×
196

197
    @overrides
4✔
198
    def delete(self):
4✔
199
        pass
×
200

201

202
class ErrorIO(DataIO):
4✔
203
    """
204
    An DataIO method that throws exceptions if any of its methods is invoked
205
    """
206

207
    @overrides
4✔
208
    def _open(self, **kwargs):
4✔
209
        raise NotImplementedError()
×
210

211
    @overrides
4✔
212
    def _read(self, count=65536, **kwargs):
4✔
213
        raise NotImplementedError()
×
214

215
    @overrides
4✔
216
    def _write(self, data, **kwargs) -> int:
4✔
217
        raise NotImplementedError()
×
218

219
    @overrides
4✔
220
    def _close(self, **kwargs):
4✔
221
        raise NotImplementedError()
×
222

223
    @overrides
4✔
224
    def _size(self, **kwargs) -> int:
4✔
225
        raise NotImplementedError()
×
226

227
    @overrides
4✔
228
    def exists(self) -> bool:
4✔
229
        raise NotImplementedError()
×
230

231
    @overrides
4✔
232
    def delete(self):
4✔
233
        raise NotImplementedError()
×
234

235

236
class MemoryIO(DataIO):
4✔
237
    """
238
    A DataIO class that reads/write from/into the BytesIO object given at
239
    construction time
240
    """
241

242
    _desc: io.BytesIO  # TODO: This might actually be a problem
4✔
243

244
    def __init__(self, buf: io.BytesIO, **kwargs):
4✔
245
        super().__init__()
4✔
246
        self._buf = buf
4✔
247

248
    def _open(self, **kwargs):
4✔
249
        if self._mode == OpenMode.OPEN_WRITE:
4✔
250
            return self._buf
4✔
251
        elif self._mode == OpenMode.OPEN_READ:
4✔
252
            # TODO: potentially wasteful copy
253
            return io.BytesIO(self._buf.getbuffer())
4✔
254
        else:
255
            raise ValueError()
×
256

257
    @overrides
4✔
258
    def _write(self, data, **kwargs) -> int:
4✔
259
        self._desc.write(data)
4✔
260
        return len(data)
4✔
261

262
    @overrides
4✔
263
    def _read(self, count=65536, **kwargs):
4✔
264
        return self._desc.read(count)
4✔
265

266
    @overrides
4✔
267
    def _close(self, **kwargs):
4✔
268
        if self._mode == OpenMode.OPEN_READ:
4✔
269
            self._desc.close()
4✔
270
        # If we're writing we don't close the descriptor because it's our
271
        # self._buf, which won't be readable afterwards
272

273
    @overrides
4✔
274
    def _size(self, **kwargs) -> int:
4✔
275
        """
276
        Return actual size of user data rather than the whole Python object
277
        """
278
        return self._buf.getbuffer().nbytes
×
279

280
    @overrides
4✔
281
    def exists(self) -> bool:
4✔
UNCOV
282
        return not self._buf.closed
×
283

284
    @overrides
4✔
285
    def delete(self):
4✔
286
        self._buf.close()
×
287

288
    @overrides
4✔
289
    def buffer(self) -> memoryview:
4✔
290
        # TODO: This may also be an issue
291
        return self._buf.getbuffer()
4✔
292

293

294
# pylint: disable=possibly-used-before-assignment
295
class SharedMemoryIO(DataIO):
4✔
296
    """
297
    A DataIO class that writes to a shared memory buffer
298
    """
299

300
    def __init__(self, uid, session_id, **kwargs):
4✔
301
        super().__init__()
4✔
302
        self._name = f"{session_id}_{uid}"
4✔
303
        self._written = 0
4✔
304
        self._pos = 0
4✔
305
        self._buf = None
4✔
306

307
    @overrides
4✔
308
    def _open(self, **kwargs):
4✔
309
        self._pos = 0
4✔
310
        if self._buf is None:
4✔
311
            if self._mode == OpenMode.OPEN_WRITE:
4✔
312
                self._written = 0
4✔
313
            self._buf = DlgSharedMemory(self._name)
4✔
314
        return self._buf
4✔
315

316
    @overrides
4✔
317
    def _write(self, data, **kwargs) -> int:
4✔
318
        total_size = len(data) + self._written
4✔
319
        if total_size > self._buf.size:
4✔
320
            self._buf.resize(total_size)
4✔
321
            self._buf.buf[self._written: total_size] = data
4✔
322
            self._written = total_size
4✔
323
        else:
324
            self._buf.buf[self._written: total_size] = data
4✔
325
            self._written = total_size
4✔
326
            self._buf.resize(total_size)
4✔
327
            # It may be inefficient to resize many times, but assuming data is written 'once' this is
328
            # might be tolerable and guarantees that the size of the underlying buffer is tight.
329
        return len(data)
4✔
330

331
    @overrides
4✔
332
    def _read(self, count=65536, **kwargs):
4✔
333
        if self._pos == self._buf.size:
4✔
334
            return None
4✔
335
        start = self._pos
4✔
336
        end = self._pos + count
4✔
337
        end = min(end, self._buf.size)
4✔
338
        out = self._buf.buf[start:end]
4✔
339
        self._pos = end
4✔
340
        return out
4✔
341

342
    @overrides
4✔
343
    def _close(self, **kwargs):
4✔
344
        if self._mode == OpenMode.OPEN_WRITE:
4✔
345
            self._buf.resize(self._written)
4✔
346
        self._buf.close()
4✔
347
        self._buf = None
4✔
348

349
    @overrides
4✔
350
    def _size(self, **kwargs) -> int:
4✔
351
        return self._buf.size
×
352

353
    @overrides
4✔
354
    def exists(self) -> bool:
4✔
355
        return self._buf is not None
×
356

357
    @overrides
4✔
358
    def delete(self):
4✔
359
        self._close()
×
360

361
# pylint: enable=possibly-used-before-assignment
362

363
class FileIO(DataIO):
4✔
364
    """
365
    A file-based implementation of DataIO
366
    """
367

368
    _desc: io.BufferedRWPair
4✔
369

370
    def __init__(self, filename, **kwargs):
4✔
371
        super().__init__()
4✔
372
        self._fnm = filename
4✔
373

374
    def _open(self, **kwargs) -> io.BufferedRWPair:
4✔
375
        flag = "r" if self._mode is OpenMode.OPEN_READ else "w"
4✔
376
        flag += "b"
4✔
377
        return open(self._fnm, flag)
4✔
378

379
    @overrides
4✔
380
    def _read(self, count=65536, **kwargs):
4✔
381
        return self._desc.read(count)
4✔
382

383
    @overrides
4✔
384
    def _write(self, data, **kwargs) -> int:
4✔
385
        self._desc.write(data)
4✔
386
        return len(data)
4✔
387

388
    @overrides
4✔
389
    def _close(self, **kwargs):
4✔
390
        self._desc.close()
4✔
391

392
    @overrides
4✔
393
    def _size(self, **kwargs) -> int:
4✔
394
        return os.path.getsize(self._fnm)
×
395

396
    def getFileName(self):
4✔
397
        """
398
        Returns the drop filename
399
        """
400
        return self._fnm
×
401

402
    @overrides
4✔
403
    def exists(self) -> bool:
4✔
404
        return os.path.isfile(self._fnm)
4✔
405

406
    @overrides
4✔
407
    def delete(self):
4✔
408
        os.unlink(self._fnm)
4✔
409

410
    @overrides
4✔
411
    def buffer(self) -> bytes:
4✔
412
        return self._desc.read(-1)
4✔
413

414

415
class NgasIO(DataIO):
4✔
416
    """
417
    A DROP whose data is finally stored into NGAS. Since NGAS doesn't
418
    support appending data to existing files, we store all the data temporarily
419
    in a file on the local filesystem and then move it to the NGAS destination
420
    """
421

422
    def __init__(
4✔
423
        self,
424
        hostname,
425
        fileId,
426
        port=7777,
427
        ngasConnectTimeout=2,
428
        ngasTimeout=2,
429
        length=-1,
430
        mimeType="application/octet-stream",
431
    ):
432

433
        # Check that we actually have the NGAMS client libraries
434
        try:
×
435
            from ngamsPClient import ngamsPClient  # @UnusedImport @UnresolvedImport
×
436
        except:
×
437
            logger.error("No NGAMS client libs found, cannot use NgasIO")
×
438
            raise
×
439

440
        super(NgasIO, self).__init__()
×
441
        self._ngasSrv = hostname
×
442
        self._ngasPort = port
×
443
        self._ngasConnectTimeout = ngasConnectTimeout
×
444
        self._ngasTimeout = ngasTimeout
×
445
        self._fileId = fileId
×
446
        self._length = length
×
447
        self._mimeType = mimeType
×
448

449
    def _getClient(self):
4✔
450
        from ngamsPClient import ngamsPClient  # @UnresolvedImport
×
451

452
        return ngamsPClient.ngamsPClient(
×
453
            self._ngasSrv, self._ngasPort, self._ngasTimeout
454
        )
455

456
    @overrides
4✔
457
    def _open(self, **kwargs):
4✔
458
        if self._mode == OpenMode.OPEN_WRITE:
×
459
            # The NGAS client API doesn't have a way to continually feed an ARCHIVE
460
            # request with data. Thus the only way we can currently archive data
461
            # into NGAS is by accumulating it all on our side and finally
462
            # sending it over.
463
            self._buf = b""
×
464
            self._writtenDataSize = 0
×
465
        return self._getClient()
×
466

467
    @overrides
4✔
468
    def _close(self, **kwargs):
4✔
469
        client = self._desc
×
470
        if self._mode == OpenMode.OPEN_WRITE:
×
471
            reply, msg, _, _ = client._httpPost(
×
472
                client.getHost(),
473
                client.getPort(),
474
                "QARCHIVE",
475
                self._mimeType,
476
                dataRef=self._buf,
477
                pars=[["filename", self._fileId]],
478
                dataSource="BUFFER",
479
                dataSize=self._writtenDataSize,
480
            )
481
            self._buf = None
×
482
            if reply.http_status != 200:
×
483
                # Probably msg is not enough, we need to unpack the status XML doc
484
                # from the returning data and extract the real error message from
485
                # there
486
                raise Exception(reply.message)
×
487

488
        # Release the reference to _desc so the client object gets destroyed
489
        del self._desc
×
490

491
    @overrides
4✔
492
    def _read(self, count, **kwargs):
4✔
493
        # Read data from NGAS and give it back to our reader
494
        self._desc.retrieve2File(self._fileId, cmd="QRETRIEVE")
×
495

496
    @overrides
4✔
497
    def _write(self, data, **kwargs) -> int:
4✔
498
        if type(data) == bytes:
×
499
            self._buf += str(data)
×
500
        else:
501
            self._buf += data
×
502
        self._writtenDataSize += len(data)
×
503
        return len(data)
×
504

505
    @overrides
4✔
506
    def exists(self) -> bool:
4✔
507
        import ngamsLib  # @UnresolvedImport
×
508

509
        status = self._getClient().sendCmd("STATUS", pars=[["fileId", self._fileId]])
×
510
        return status.getStatus() == ngamsLib.ngamsCore.NGAMS_SUCCESS
×
511

512
    def fileStatus(self):
4✔
513
        import ngamsLib  # @UnresolvedImport
×
514

515
        # status = self._getClient().sendCmd('STATUS', pars=[['fileId', self._fileId]])
516
        status = self._getClient.fileStatus("STATUS?file_id=%s" % self._fileId)
×
517
        if status.getStatus() != ngamsLib.ngamsCore.NGAMS_SUCCESS:
×
518
            raise FileNotFoundError
×
519
        fs = dict(
×
520
            status.getDiskStatusList()[0]
521
            .getFileObjList()[0]
522
            .genXml()
523
            .attributes.items()
524
        )
525
        return fs
×
526

527
    @overrides
4✔
528
    def _size(self, **kwargs) -> int:
4✔
529
        return self._writtenDataSize
×
530

531
    def delete(self):
4✔
532
        pass  # We never delete stuff from NGAS
×
533

534

535
class NgasLiteIO(DataIO):
4✔
536
    """
537
    An IO class whose data is finally stored into NGAS. It uses the ngaslite
538
    module of DALiuGE instead of the full client-side libraries provided by NGAS
539
    itself, since they might not be installed everywhere.
540

541
    The `ngaslite` module doesn't support the STATUS command yet, and because of
542
    that this class will throw an error if its `exists` method is invoked.
543
    """
544

545
    _desc: HTTPConnection
4✔
546

547
    def __init__(
4✔
548
        self,
549
        hostname,
550
        fileId,
551
        port=7777,
552
        ngasConnectTimeout=2,
553
        ngasTimeout=2,
554
        length=-1,
555
        mimeType="application/octet-stream",
556
    ):
557
        super(NgasLiteIO, self).__init__()
×
558
        self._ngasSrv = hostname
×
559
        self._ngasPort = port
×
560
        self._ngasConnectTimeout = ngasConnectTimeout
×
561
        self._ngasTimeout = ngasTimeout
×
562
        self._fileId = fileId
×
563
        self._length = length
×
564
        self._mimeType = mimeType
×
565

566
    def _is_length_unknown(self):
4✔
567
        return self._length is None or self._length < 0
×
568

569
    def _getClient(self):
4✔
570
        return ngaslite.open(
×
571
            self._ngasSrv,
572
            self._fileId,
573
            port=self._ngasPort,
574
            timeout=self._ngasTimeout,
575
            mode=self._mode,
576
            mimeType=self._mimeType,
577
            length=self._length,
578
        )
579

580
    def _open(self, **kwargs):
4✔
581
        if self._mode == OpenMode.OPEN_WRITE:
×
582
            if self._is_length_unknown():
×
583
                # NGAS does not support HTTP chunked writes and thus it requires the Content-length
584
                # of the whole fileObject to be known and sent up-front as part of the header. Thus
585
                # is size is not provided all data will be buffered IN MEMORY and only sent to NGAS
586
                # when finishArchive is called.
587
                self._buf = b""
×
588
                self._writtenDataSize = 0
×
589
            client = self._getClient()
×
590
        else:
591
            client = self._getClient()
×
592
            self._read_gen = client
×
593
        return client
×
594

595
    def _close(self, **kwargs):
4✔
596
        if self._mode == OpenMode.OPEN_WRITE:
×
597
            conn = self._desc
×
598
            if self._is_length_unknown():
×
599
                # If length wasn't known up-front we first send Content-Length and then the buffer here.
600
                conn.putheader("Content-Length", len(self._buf))
×
601
                conn.endheaders()
×
602
                logger.debug("Sending data for file %s to NGAS", self._fileId)
×
603
                conn.send(self._buf)
×
604
                self._buf = None
×
605
            else:
606
                logger.debug(
×
607
                    f"Length is known, assuming data has been sent ({self._fileId}, {self._length})"
608
                )
609
            ngaslite.finishArchive(conn, self._fileId)
×
610
            conn.close()
×
611
        else:
612
            response = self._desc
×
613
            response.close()
×
614

615
    def _read(self, count=65536, **kwargs):
4✔
616
        try:
×
617
            buf = self._read_gen.__next__()
×
618
        except StopIteration:
×
619
            buf = b""
×
620
        return buf
×
621

622
    def _write(self, data, **kwargs) -> int:
4✔
623
        if self._is_length_unknown():
×
624
            self._buf += data
×
625
        else:
626
            self._desc.send(data)
×
627
        # logger.debug("Wrote %s bytes", len(data))
628
        return len(data)
×
629

630
    def exists(self) -> bool:
4✔
631
        raise NotImplementedError("This method is not supported by this class")
×
632

633
    def fileStatus(self):
4✔
634
        logger.debug("Getting status of file %s", self._fileId)
×
635
        return ngaslite.fileStatus(self._ngasSrv, self._ngasPort, self._fileId)
×
636

637
    @overrides
4✔
638
    def delete(self):
4✔
639
        pass  # We never delete stuff from NGAS
×
640

641

642
def IOForURL(url):
4✔
643
    """
644
    Returns a DataIO instance that handles the given URL for reading. If no
645
    suitable DataIO class can be found to handle the URL, `None` is returned.
646
    """
647
    url = urllib.parse.urlparse(url)
4✔
648
    io = None
4✔
649
    if url.scheme == "file":
4✔
650
        hostname = url.netloc
4✔
651
        filename = url.path
4✔
652
        if (
4✔
653
            hostname == "localhost"
654
            or hostname == "localhost"
655
            or hostname == os.uname()[1]
656
        ):
657
            io = FileIO(filename)
4✔
658
    elif url.scheme == "null":
×
659
        io = NullIO()
×
660
    elif url.scheme == "ngas":
×
661
        networkLocation = url.netloc
×
662
        if ":" in networkLocation:
×
663
            hostname, port = networkLocation.split(":")
×
664
            port = int(port)
×
665
        else:
666
            hostname = networkLocation
×
667
            port = 7777
×
668
        fileId = url.path[1:]  # remove the trailing slash
×
669
        try:
×
670
            io = NgasIO(hostname, fileId, port)
×
671
        except:
×
672
            logger.warning("NgasIO not available, using NgasLiteIO instead")
×
673
            io = NgasLiteIO(hostname, fileId, port)
×
674

675
    logger.debug("I/O chosen for dataURL %s: %r", url, io)
4✔
676

677
    return io
4✔
678

679

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc