• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 11628884181

01 Nov 2024 11:49AM UTC coverage: 58.814% (-1.3%) from 60.156%
11628884181

Pull #548

github

web-flow
Merge 4b9b7cf1f into ae98c2039
Pull Request #548: Strip out redundant __new__ to enable object pickling

8364 of 14221 relevant lines covered (58.81%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.0
/owslib/coverage/wcsdecoder.py
1
# WCS response decoder.
2
# Decodes response from a WCS (either a Coverages XML document or a Multipart MIME)
3
# and extracts the urls of the coverage data.
4
# Copyright (c) 2007 STFC <http://www.stfc.ac.uk>
5
# Author: Dominic Lowe, STFC
6
# contact email: d.lowe@rl.ac.uk
7
#
8
# Multipart MIME decoding based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/86676
9

10
# example: used in conjunction with ows lib wcs:
11

12
# from owslib import wcsdecoder
13
# u=wcs.getcoverage(identifier=['TuMYrRQ4'], timeSequence=['2792-06-01T00:00:00.0'], bbox=(-112,36,-106,41),
14
#                   format='application/netcdf', store='true')
15
# decoder=wcsdecoder.WCSDecoder(u)
16
# decoder.getCoverages()
17

18
import os
2✔
19
from owslib.etree import etree
2✔
20
import email
2✔
21
import errno
2✔
22
import mimetypes
2✔
23

24

25
class WCSDecoder(object):
2✔
26
    def __init__(self, u):
2✔
27
        ''' initiate with a urllib  url object.'''
28
        self.u = u
×
29
        self._getType()
×
30

31
    def _getType(self):
2✔
32
        ''' determine whether it is a Multipart Mime or a Coverages XML file'''
33

34
        # what's the best way to test this?
35
        # for now read start of file
36
        tempu = self.u
×
37
        if tempu.readline()[:14] == '<?xml version=':
×
38
            self.urlType = 'XML'
×
39
        else:
40
            self.urlType = 'Multipart'
×
41

42
    def getCoverages(self, unpackdir='./unpacked'):
2✔
43
        if self.urlType == 'XML':
×
44
            paths = []
×
45
            u_xml = self.u.read()
×
46
            u_tree = etree.fromstring(u_xml)
×
47
            for ref in u_tree.findall(
×
48
                    '{http://www.opengis.net/wcs/1.1}Coverage/{http://www.opengis.net/wcs/1.1}Reference'):
49
                path = ref.attrib['{http://www.w3.org/1999/xlink}href']
×
50
                paths.append(path)
×
51
            for ref in u_tree.findall(
×
52
                    '{http://www.opengis.net/wcs/1.1.0/owcs}Coverage/{{http://www.opengis.net/wcs/1.1.0/owcs}Reference'):  # noqa
53
                path = ref.attrib['{http://www.w3.org/1999/xlink}href']
×
54
                paths.append(path)
×
55
        elif self.urlType == 'Multipart':
×
56
            # Decode multipart mime and return fileobjects
57
            u_mpart = self.u.read()
×
58
            mpart = MpartMime(u_mpart)
×
59
            paths = mpart.unpackToDir(unpackdir)
×
60
        return paths
×
61

62

63
class MpartMime(object):
2✔
64
    def __init__(self, mpartmime):
2✔
65
        """ mpartmime is a multipart mime file  that has already been read in."""
66
        self.mpartmime = mpartmime
×
67

68
    def unpackToDir(self, unpackdir):
2✔
69
        """ unpacks contents of Multipart mime to a given directory"""
70

71
        names = []
×
72
        # create the directory if it doesn't exist:
73
        try:
×
74
            os.mkdir(unpackdir)
×
75
        except OSError as e:
×
76
            # Ignore directory exists error
77
            if e.errno != errno.EEXIST:
×
78
                raise
×
79

80
        # now walk through the multipart mime and write out files
81
        msg = email.message_from_string(self.mpartmime)
×
82
        counter = 1
×
83
        for part in msg.walk():
×
84
            # multipart/* are just containers, ignore
85
            if part.get_content_maintype() == 'multipart':
×
86
                continue
×
87
            # Applications should really check the given filename so that an
88
            # email message can't be used to overwrite important files
89
            filename = part.get_filename()
×
90
            if not filename:
×
91
                try:
×
92
                    ext = mimetypes.guess_extension(part.get_type())
×
93
                except Exception:
×
94
                    ext = None
×
95
                if not ext:
×
96
                    # Use a generic extension
97
                    ext = '.bin'
×
98
                filename = 'part-%03d%s' % (counter, ext)
×
99
            fullpath = os.path.join(unpackdir, filename)
×
100
            names.append(fullpath)
×
101
            fp = open(fullpath, 'wb')
×
102
            fp.write(part.get_payload(decode=True))
×
103
            fp.close()
×
104
        return names
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc