• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pytroll / pyresample / 4653744392

pending completion
4653744392

Pull #511

github

GitHub
Merge edb481dd6 into 7ca8789a3
Pull Request #511: Bump pypa/gh-action-pypi-publish from 1.8.4 to 1.8.5

12270 of 13075 relevant lines covered (93.84%)

3.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.89
/pyresample/_spatial_mp.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2010-2021 Pyresample developers
5
#
6
# This program is free software: you can redistribute it and/or modify it under
7
# the terms of the GNU Lesser General Public License as published by the Free
8
# Software Foundation, either version 3 of the License, or (at your option) any
9
# later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Lesser General Public License for more details.
15
#
16
# You should have received a copy of the GNU Lesser General Public License along
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
18
"""Multiprocessing versions of KDTree and Proj classes."""
4✔
19

20
from __future__ import absolute_import
4✔
21

22
import ctypes
4✔
23
import multiprocessing as mp
4✔
24

25
import numpy as np
4✔
26
import pyproj
4✔
27

28
try:
4✔
29
    import numexpr as ne
4✔
30
except ImportError:
4✔
31
    ne = None
4✔
32

33
from ._multi_proc import Scheduler, shmem_as_ndarray
4✔
34

35
# Earth radius
36
R = 6370997.0
4✔
37

38

39
class cKDTree_MP(object):
4✔
40
    """Multiprocessing cKDTree subclass, shared memory."""
41

42
    def __init__(self, data, leafsize=10, nprocs=2, chunk=None,
4✔
43
                 schedule='guided'):
44
        """Prepare shared memory for KDTree operations.
45

46
        Same as cKDTree.__init__ except that an internal copy of data to shared memory is made.
47

48
        Extra keyword arguments:
49
        chunk : Minimum chunk size for the load balancer.
50
        schedule: Strategy for balancing work load
51
        ('static', 'dynamic' or 'guided').
52
        """
53
        self.n, self.m = data.shape
4✔
54
        # Allocate shared memory for data
55
        self.shmem_data = mp.RawArray(ctypes.c_double, self.n * self.m)
4✔
56

57
        # View shared memory as ndarray, and copy over the data.
58
        # The RawArray objects have information about the dtype and
59
        # buffer size.
60
        _data = shmem_as_ndarray(self.shmem_data).reshape((self.n, self.m))
4✔
61
        _data[:, :] = data
4✔
62

63
        # Initialize parent, we must do this last because
64
        # cKDTree stores a reference to the data array. We pass in
65
        # the copy in shared memory rather than the origial data.
66
        self.leafsize = leafsize
4✔
67
        self._nprocs = nprocs
4✔
68
        self._chunk = chunk
4✔
69
        self._schedule = schedule
4✔
70

71
    def query(self, x, k=1, eps=0, p=2, distance_upper_bound=np.inf):
4✔
72
        """Query for points at index 'x' parallelized with multiple processes and shared memory."""
73
        # allocate shared memory for x and result
74
        nx = x.shape[0]
4✔
75
        shmem_x = mp.RawArray(ctypes.c_double, nx * self.m)
4✔
76
        shmem_d = mp.RawArray(ctypes.c_double, nx * k)
4✔
77
        shmem_i = mp.RawArray(ctypes.c_int, nx * k)
4✔
78

79
        # view shared memory as ndarrays
80
        _x = shmem_as_ndarray(shmem_x).reshape((nx, self.m))
4✔
81
        if k == 1:
4✔
82
            _d = shmem_as_ndarray(shmem_d)
4✔
83
            _i = shmem_as_ndarray(shmem_i)
4✔
84
        else:
85
            _d = shmem_as_ndarray(shmem_d).reshape((nx, k))
4✔
86
            _i = shmem_as_ndarray(shmem_i).reshape((nx, k))
4✔
87

88
        # copy x to shared memory
89
        _x[:] = x
4✔
90

91
        # set up a scheduler to load balance the query
92
        scheduler = Scheduler(nx, self._nprocs, chunk=self._chunk,
4✔
93
                              schedule=self._schedule)
94

95
        # query with multiple processes
96
        query_args = [scheduler, self.shmem_data, self.n, self.m,
4✔
97
                      self.leafsize, shmem_x, nx, shmem_d, shmem_i,
98
                      k, eps, p, distance_upper_bound]
99

100
        _run_jobs(_parallel_query, query_args, self._nprocs)
4✔
101
        # return results (private memory)
102
        return _d.copy(), _i.copy()
4✔
103

104

105
class Proj_MP:
4✔
106
    """Multi-processing version of the pyproj Proj class."""
107

108
    def __init__(self, *args, **kwargs):
4✔
109
        self._args = args
4✔
110
        self._kwargs = kwargs
4✔
111

112
    def __call__(self, data1, data2, inverse=False, radians=False,
4✔
113
                 errcheck=False, nprocs=2, chunk=None, schedule='guided'):
114
        """Transform coordinates to coordinates in the current coordinate system."""
115
        grid_shape = data1.shape
4✔
116
        n = data1.size
4✔
117

118
        # Create shared memory
119
        shmem_data1 = mp.RawArray(ctypes.c_double, n)
4✔
120
        shmem_data2 = mp.RawArray(ctypes.c_double, n)
4✔
121
        shmem_res1 = mp.RawArray(ctypes.c_double, n)
4✔
122
        shmem_res2 = mp.RawArray(ctypes.c_double, n)
4✔
123

124
        # view shared memory as ndarrays
125
        _data1 = shmem_as_ndarray(shmem_data1)
4✔
126
        _data2 = shmem_as_ndarray(shmem_data2)
4✔
127
        _res1 = shmem_as_ndarray(shmem_res1)
4✔
128
        _res2 = shmem_as_ndarray(shmem_res2)
4✔
129

130
        # copy input data to shared memory
131
        _data1[:] = data1.ravel()
4✔
132
        _data2[:] = data2.ravel()
4✔
133

134
        # set up a scheduler to load balance the query
135
        scheduler = Scheduler(n, nprocs, chunk=chunk, schedule=schedule)
4✔
136

137
        # Projection with multiple processes
138
        proj_call_args = [scheduler, shmem_data1, shmem_data2, shmem_res1,
4✔
139
                          shmem_res2, self._args, self._kwargs, inverse,
140
                          radians, errcheck]
141

142
        _run_jobs(_parallel_proj, proj_call_args, nprocs)
4✔
143
        return _res1.copy().reshape(grid_shape), _res2.copy().reshape(grid_shape)
4✔
144

145

146
class Cartesian(object):
4✔
147
    """Cartesian coordinates."""
148

149
    def __init__(self, *args, **kwargs):
4✔
150
        pass
4✔
151

152
    def transform_lonlats(self, lons, lats):
4✔
153
        """Transform longitudes and latitues to cartesian coordinates."""
154
        if np.issubdtype(lons.dtype, np.integer):
4✔
155
            lons = lons.astype(np.float64)
4✔
156
        coords = np.zeros((lons.size, 3), dtype=lons.dtype)
4✔
157
        if ne:
4✔
158
            deg2rad = np.pi / 180  # noqa: F841
×
159
            coords[:, 0] = ne.evaluate("R*cos(lats*deg2rad)*cos(lons*deg2rad)")
×
160
            coords[:, 1] = ne.evaluate("R*cos(lats*deg2rad)*sin(lons*deg2rad)")
×
161
            coords[:, 2] = ne.evaluate("R*sin(lats*deg2rad)")
×
162
        else:
163
            coords[:, 0] = R * np.cos(np.deg2rad(lats)) * np.cos(np.deg2rad(lons))
4✔
164
            coords[:, 1] = R * np.cos(np.deg2rad(lats)) * np.sin(np.deg2rad(lons))
4✔
165
            coords[:, 2] = R * np.sin(np.deg2rad(lats))
4✔
166
        return coords
4✔
167

168

169
Cartesian_MP = Cartesian
4✔
170

171

172
def _run_jobs(target, args, nprocs):
4✔
173
    """Run process pool."""
174
    # return status in shared memory
175
    # access to these values are serialized automatically
176
    ierr = mp.Value(ctypes.c_int, 0)
4✔
177
    warn_msg = mp.Array(ctypes.c_char, 1024)
4✔
178

179
    args.extend((ierr, warn_msg))
4✔
180

181
    pool = [mp.Process(target=target, args=args) for n in range(nprocs)]
4✔
182
    for p in pool:
4✔
183
        p.start()
4✔
184
    for p in pool:
4✔
185
        p.join()
4✔
186
    if ierr.value != 0:
4✔
187
        raise RuntimeError('%d errors in worker processes. Last one reported:\n%s' %
×
188
                           (ierr.value, warn_msg.value.decode()))
189

190
# This is executed in an external process:
191

192

193
def _parallel_query(scheduler,  # scheduler for load balancing
4✔
194
                    # data needed to reconstruct the kd-tree
195
                    data, ndata, ndim, leafsize,
196
                    x, nx, d, i,  # query data and results
197
                    k, eps, p, dub,  # auxillary query parameters
198
                    ierr, warn_msg):  # return values (0 on success)
199

200
    try:
×
201
        # View shared memory as ndarrays.
202
        _data = shmem_as_ndarray(data).reshape((ndata, ndim))
×
203
        _x = shmem_as_ndarray(x).reshape((nx, ndim))
×
204
        if k == 1:
×
205
            _d = shmem_as_ndarray(d)
×
206
            _i = shmem_as_ndarray(i)
×
207
        else:
208
            _d = shmem_as_ndarray(d).reshape((nx, k))
×
209
            _i = shmem_as_ndarray(i).reshape((nx, k))
×
210

211
        # Reconstruct the kd-tree from the data.
212
        import scipy.spatial as sp
×
213
        kdtree = sp.cKDTree(_data, leafsize=leafsize)
×
214

215
        # Query for nearest neighbours, using slice ranges,
216
        # from the load balancer.
217
        for s in scheduler:
×
218
            if k == 1:
×
219
                _d[s], _i[s] = kdtree.query(_x[s, :], k=1, eps=eps, p=p,
×
220
                                            distance_upper_bound=dub)
221
            else:
222
                _d[s, :], _i[s, :] = kdtree.query(_x[s, :], k=k, eps=eps, p=p,
×
223
                                                  distance_upper_bound=dub)
224
    # An error occured, increment the return value ierr.
225
    # Access to ierr is serialized by multiprocessing.
226
    except Exception as e:
×
227
        ierr.value += 1
×
228
        warn_msg.value = str(e).encode()
×
229

230

231
def _parallel_proj(scheduler, data1, data2, res1, res2, proj_args, proj_kwargs,
4✔
232
                   inverse, radians, errcheck, ierr, warn_msg):
233
    try:
×
234
        # View shared memory as ndarrays.
235
        _data1 = shmem_as_ndarray(data1)
×
236
        _data2 = shmem_as_ndarray(data2)
×
237
        _res1 = shmem_as_ndarray(res1)
×
238
        _res2 = shmem_as_ndarray(res2)
×
239

240
        # Initialise pyproj
241
        proj = pyproj.Proj(*proj_args, **proj_kwargs)
×
242

243
        # Reproject data segment
244
        for s in scheduler:
×
245
            _res1[s], _res2[s] = proj(_data1[s], _data2[s], inverse=inverse,
×
246
                                      radians=radians, errcheck=errcheck)
247

248
    # An error occured, increment the return value ierr.
249
    # Access to ierr is serialized by multiprocessing.
250
    except Exception as e:
×
251
        ierr.value += 1
×
252
        warn_msg.value = str(e).encode()
×
253

254

255
def _parallel_transform(scheduler, lons, lats, n, coords, ierr, warn_msg):
4✔
256
    try:
×
257
        # View shared memory as ndarrays.
258
        _lons = shmem_as_ndarray(lons)
×
259
        _lats = shmem_as_ndarray(lats)
×
260
        _coords = shmem_as_ndarray(coords).reshape((n, 3))
×
261

262
        # Transform to cartesian coordinates
263
        for s in scheduler:
×
264
            _coords[s, 0] = R * \
×
265
                np.cos(np.radians(_lats[s])) * np.cos(np.radians(_lons[s]))
266
            _coords[s, 1] = R * \
×
267
                np.cos(np.radians(_lats[s])) * np.sin(np.radians(_lons[s]))
268
            _coords[s, 2] = R * np.sin(np.radians(_lats[s]))
×
269
    # An error occured, increment the return value ierr.
270
    # Access to ierr is serialized by multiprocessing.
271
    except Exception as e:
×
272
        ierr.value += 1
×
273
        warn_msg.value = str(e).encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc