• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

johntruckenbrodt / spatialist / 8661843611

12 Apr 2024 12:13PM UTC coverage: 72.895%. First build
8661843611

push

github

web-flow
Merge pull request #45 from johntruckenbrodt/maintenance

general package maintenance

2 of 2 new or added lines in 1 file covered. (100.0%)

1931 of 2649 relevant lines covered (72.9%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.25
/spatialist/sqlite_util.py
1
import os
1✔
2
import re
1✔
3
import platform
1✔
4
import zipfile as zf
1✔
5

6
from ctypes.util import find_library
1✔
7

8

9
def check_loading():
1✔
10
    try:
1✔
11
        conn = sqlite3.connect(':memory:')
1✔
12
        conn.enable_load_extension(True)
1✔
13
    except (sqlite3.OperationalError, AttributeError):
×
14
        raise RuntimeError
×
15

16

17
errormessage = 'sqlite3 does not support loading extensions and {}; ' \
1✔
18
               'please refer to the spatialist installation instructions'
×
19
try:
1✔
20
    import sqlite3
1✔
21
    
22
    check_loading()
1✔
23
except RuntimeError:
×
24
    try:
×
25
        from pysqlite2 import dbapi2 as sqlite3
×
26
        
27
        check_loading()
×
28
    except ImportError:
×
29
        raise RuntimeError(errormessage.format('pysqlite2 does not exist as alternative'))
×
30
    except RuntimeError:
×
31
        raise RuntimeError(errormessage.format('neither does pysqlite2'))
×
32

33

34
def sqlite_setup(driver=':memory:', extensions=None, verbose=False):
1✔
35
    """
36
    Setup a sqlite3 connection and load extensions to it.
37
    This function intends to simplify the process of loading extensions to `sqlite3`, which can be quite difficult
38
    depending on the version used.
39
    Particularly loading `spatialite` has caused quite some trouble. In recent distributions of Ubuntu this has
40
    become much easier due to a new apt package `libsqlite3-mod-spatialite`. For use in Windows, `spatialist` comes
41
    with its own `spatialite` DLL distribution.
42
    See `here <https://www.gaia-gis.it/fossil/libspatialite/wiki?name=mod_spatialite>`_ for more details on loading
43
    `spatialite` as an `sqlite3` extension.
44

45
    Parameters
46
    ----------
47
    driver: str
48
        the database file or (by default) an in-memory database
49
    extensions: list
50
        a list of extensions to load
51
    verbose: bool
52
        print loading information?
53

54
    Returns
55
    -------
56
    sqlite3.Connection
57
        the database connection
58

59
    Example
60
    -------
61
    >>> from spatialist.sqlite_util import sqlite_setup
62
    >>> conn = sqlite_setup(extensions=['spatialite'])
63
    """
64
    conn = __Handler(driver, extensions, verbose=verbose)
1✔
65
    return conn.conn
1✔
66

67

68
def spatialite_setup():
1✔
69
    if platform.system() == 'Windows':
1✔
70
        directory = os.path.join(os.path.expanduser('~'), '.spatialist')
×
71
        subdir = os.path.join(directory, 'mod_spatialite')
×
72
        if not os.path.isdir(subdir):
×
73
            os.makedirs(subdir)
×
74
        mod_spatialite = os.path.join(subdir, 'mod_spatialite.dll')
×
75
        if not os.path.isfile(mod_spatialite):
×
76
            source_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),
×
77
                                      'pkgs', 'mod_spatialite')
×
78
            # print('machine: {}'.format(platform.machine()))
79
            suffix = 'amd64' if platform.machine().endswith('64') else 'x86'
×
80
            source = os.path.join(source_dir, 'mod_spatialite-4.3.0a-win-{}.zip'.format(suffix))
×
81
            # print('extracting {} to {}'.format(os.path.basename(source), subdir))
82
            archive = zf.ZipFile(source, 'r')
×
83
            archive.extractall(subdir)
×
84
            archive.close()
×
85
        os.environ['PATH'] = '{}{}{}'.format(os.environ['PATH'], os.path.pathsep, subdir)
×
86

87

88
class __Handler(object):
1✔
89
    def __init__(self, driver=':memory:', extensions=None, verbose=False):
1✔
90
        self.conn = sqlite3.connect(driver)
1✔
91
        self.conn.enable_load_extension(True)
1✔
92
        self.extensions = []
1✔
93
        self.verbose = verbose
1✔
94
        if isinstance(extensions, list):
1✔
95
            for ext in extensions:
1✔
96
                self.load_extension(ext)
1✔
97
        elif extensions is not None:
1✔
98
            raise RuntimeError('extensions must either be a list or None')
1✔
99
        if verbose:
1✔
100
            print('using sqlite version {}'.format(self.version['sqlite']))
×
101
        if 'spatialite' in self.version.keys() and verbose:
1✔
102
            print('using spatialite version {}'.format(self.version['spatialite']))
×
103
    
104
    @property
1✔
105
    def version(self):
1✔
106
        out = {'sqlite': sqlite3.sqlite_version}
1✔
107
        cursor = self.conn.cursor()
1✔
108
        try:
1✔
109
            cursor.execute('SELECT spatialite_version()')
1✔
110
            spatialite_version = self.__encode(cursor.fetchall()[0][0])
1✔
111
            out['spatialite'] = spatialite_version
1✔
112
        except sqlite3.OperationalError:
1✔
113
            pass
1✔
114
        return out
1✔
115
    
116
    def get_tablenames(self):
1✔
117
        cursor = self.conn.cursor()
1✔
118
        cursor.execute('SELECT * FROM sqlite_master WHERE type="table"')
1✔
119
        names = [self.__encode(x[1]) for x in cursor.fetchall()]
1✔
120
        return names
1✔
121
    
122
    def load_extension(self, extension):
1✔
123
        if re.search('spatialite', extension):
1✔
124
            spatialite_setup()
1✔
125
            select = None
1✔
126
            # first try to load the dedicated mod_spatialite adapter
127
            for option in ['mod_spatialite', 'mod_spatialite.so', 'mod_spatialite.dll']:
1✔
128
                try:
1✔
129
                    self.conn.load_extension(option)
1✔
130
                    select = option
1✔
131
                    self.extensions.append(option)
1✔
132
                    if self.verbose:
1✔
133
                        print('loading extension {0} as {1}'.format(extension, option))
×
134
                    break
1✔
135
                except sqlite3.OperationalError as e:
×
136
                    if self.verbose:
×
137
                        print('{0}: {1}'.format(option, str(e)))
×
138
                    continue
×
139
            
140
            # if loading mod_spatialite fails try to load libspatialite directly
141
            if select is None:
1✔
142
                try:
×
143
                    self.__load_regular('spatialite')
×
144
                except RuntimeError as e:
×
145
                    raise type(e)(str(e) +
×
146
                                  '\nit can be installed via apt:'
×
147
                                  '\nsudo apt-get install libsqlite3-mod-spatialite')
×
148
            
149
            # initialize spatial support
150
            if 'spatial_ref_sys' not in self.get_tablenames():
1✔
151
                cursor = self.conn.cursor()
1✔
152
                if select is None:
1✔
153
                    # libspatialite extension
154
                    cursor.execute('SELECT InitSpatialMetaData();')
×
155
                else:
×
156
                    # mod_spatialite extension
157
                    cursor.execute('SELECT InitSpatialMetaData(1);')
1✔
158
                self.conn.commit()
1✔
159
        
160
        else:
×
161
            self.__load_regular(extension)
1✔
162
    
163
    def __load_regular(self, extension):
1✔
164
        options = []
1✔
165
        
166
        # create an extension library option starting with 'lib' without extension suffices;
167
        # e.g. 'libgdal' but not 'gdal.so'
168
        ext_base = self.__split_ext(extension)
1✔
169
        if not ext_base.startswith('lib'):
1✔
170
            ext_base = 'lib' + ext_base
1✔
171
        options.append(ext_base)
1✔
172
        
173
        # get the full extension library name; e.g. 'libgdal.so.20'
174
        ext_mod = find_library(extension.replace('lib', ''))
1✔
175
        if ext_mod is None:
1✔
176
            raise RuntimeError('no library found for extension {}'.format(extension))
×
177
        options.append(ext_mod)
1✔
178
        
179
        # loop through extension library name options and try to load them
180
        success = False
1✔
181
        for option in options:
1✔
182
            try:
1✔
183
                self.conn.load_extension(option)
1✔
184
                self.extensions.append(option)
1✔
185
                if self.verbose:
1✔
186
                    print('loading extension {0} as {1}'.format(extension, option))
×
187
                success = True
1✔
188
                break
1✔
189
            except sqlite3.OperationalError:
×
190
                continue
×
191
        
192
        if not success:
1✔
193
            raise RuntimeError('failed to load extension {}'.format(extension))
×
194
    
195
    @staticmethod
1✔
196
    def __split_ext(extension):
1✔
197
        base = extension
1✔
198
        while re.search(r'\.', base):
1✔
199
            base = os.path.splitext(base)[0]
×
200
        return base
1✔
201
    
202
    @staticmethod
1✔
203
    def __encode(string, encoding='utf-8'):
1✔
204
        if not isinstance(string, str):
1✔
205
            return string.encode(encoding)
×
206
        else:
×
207
            return string
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc