• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ssec / sift / 13528712524

25 Feb 2025 06:50PM UTC coverage: 29.691% (-20.2%) from 49.871%
13528712524

push

github

web-flow
Merge pull request #437 from ameraner/fix_export_image_float

Deactivate export image and rgb config tests to avoid Segfaults in tests and add explicit float casting for Fraction call to fix export tests

0 of 1 new or added line in 1 file covered. (0.0%)

2747 existing lines in 33 files now uncovered.

4386 of 14772 relevant lines covered (29.69%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.83
/uwsift/workspace/simple_workspace.py
1
import logging
2✔
2
import os
2✔
3
from collections import ChainMap
2✔
4
from datetime import datetime
2✔
5
from typing import Generator, Mapping, Optional, Tuple
2✔
6
from uuid import UUID
2✔
7

8
import numpy as np
2✔
9
import satpy.readers
2✔
10

11
from uwsift import CLEANUP_FILE_CACHE, config
2✔
12
from uwsift.common import Info, Kind, State
2✔
13

14
from .importer import SatpyImporter, aImporter
2✔
15
from .metadatabase import Content, ContentImage, Product
2✔
16
from .workspace import ActiveContent, BaseWorkspace, frozendict
2✔
17

18
LOG = logging.getLogger(__name__)
2✔
19

20

21
class SimpleWorkspace(BaseWorkspace):
2✔
22
    """
23
    Data management object for monitoring use case.
24

25
    Unlike CachingWorkspace SimpleWorkspace has no database where the
26
    datasets are saved. So every dataset which is loaded is only available while
27
    the software is running.
28

29
    SimpleWorkspace shall work with Datasets.
30
    SimpleWorkspace have one dictionary for saving the Product objects and
31
    one dictionary for saving the Content objects for a specific UUID.
32
    """
33

34
    def __init__(self, directory_path: str):
2✔
UNCOV
35
        super(SimpleWorkspace, self).__init__(directory_path)
×
36

UNCOV
37
        self.products: dict = {}
×
UNCOV
38
        self.contents: dict = {}
×
UNCOV
39
        self._available: dict = {}
×
40

UNCOV
41
        self.remove_content_data_from_cache_dir_checked()
×
42

43
    @property
2✔
44
    def _S(self):
2✔
45
        return None
×
46

47
    def clear_workspace_content(self):
2✔
48
        """Remove binary files from workspace and workspace database."""
49
        LOG.info("Clearing workspace contents...")
×
50
        self.contents = {}
×
51
        self.products = {}
×
52

53
    #
54
    #  data array handling
55
    #
56

57
    def _activate_content(self, c: Content) -> ActiveContent:
2✔
58
        self._available[c.uuid] = zult = ActiveContent(self.cache_dir, c, self.get_info(c.uuid))
×
59
        c.touch()
×
60
        c.product.touch()
×
61
        self.remove_content_data_from_cache_dir_checked(c.uuid)
×
62
        return zult
×
63

64
    def _cached_arrays_for_content(self, c: Content):
2✔
65
        """
66
        attach cached data indicated in Content, unless it's been attached already and is in _available
67
        touch the content and product in the database to appease the LRU gods
68
        :param c: metadatabase Content object for session attached to current thread
69
        :return: workspace_content_arrays
70
        """
71
        cache_entry = self._available.get(c.uuid)
×
72
        return cache_entry or self._activate_content(c)
×
73

74
    # FIXME: Use code from CachingWorkspace._remove_content_files_from_workspace?
75
    def remove_content_data_from_cache_dir_checked(self, uuid: Optional[UUID] = None):
2✔
76
        """Check whether the numpy.memmap cache files are to be deleted. If yes, then either all existing cache files
77
        will be deleted or only the cache files with the specified uuid will be deleted.
78

79
        If a PermissionError occurs, the file that triggered this error is skipped.
80
        """
UNCOV
81
        if CLEANUP_FILE_CACHE:
×
UNCOV
82
            for file in os.listdir(self.cache_dir):
×
83
                try:
×
84
                    if uuid is not None:
×
85
                        if file.startswith(str(uuid)):
×
86
                            os.remove(os.path.join(self.cache_dir, file))
×
87
                    else:
88
                        os.remove(os.path.join(self.cache_dir, file))
×
89
                except PermissionError as e:
×
90
                    LOG.debug(f"Can't delete numpy memmap cache file {file}: {e}")
×
91

92
    #
93
    # often-used queries
94
    #
95

96
    def _product_with_uuid(self, session, uuid: UUID) -> Optional[Product]:
2✔
97
        return self.products.get(uuid, None)
×
98

99
    def _product_overview_content(
2✔
100
        self, session, prod: Optional[Product] = None, uuid: Optional[UUID] = None, kind: Kind = Kind.IMAGE
101
    ) -> Optional[Content]:
102
        return self.contents.get(uuid, None)
×
103

104
    def _product_native_content(
2✔
105
        self, session, prod: Optional[Product] = None, uuid: Optional[UUID] = None, kind: Kind = Kind.IMAGE
106
    ) -> Optional[Content]:
107
        return self.contents.get(uuid, None)
×
108

109
    #
110
    # combining queries with data content
111
    #
112

113
    def _overview_content_for_uuid(self, uuid: UUID, kind: Kind = Kind.IMAGE) -> np.memmap:
2✔
114
        ovc = self._product_overview_content(None, uuid=uuid, kind=kind)
×
115
        assert ovc is not None  # nosec B101
×
116
        arrays = self._cached_arrays_for_content(ovc)
×
117
        return arrays.data
×
118

119
    #
120
    # workspace file management
121
    #
122

123
    @property
2✔
124
    def _total_workspace_bytes(self):
2✔
125
        return None
×
126

127
    def _all_product_uuids(self) -> list:
2✔
128
        return [self.products[p] for p in self.products]
×
129

130
    def get_info(self, info_or_uuid, lod=None) -> Optional[frozendict]:
2✔
131
        """
132
        Get the metadata dictionary for the Product referenced by info_or_uuid.
133
        :param info_or_uuid: existing dataset info dictionary containing a UUID, or the UUID directly
134
        :param lod: desired level of detail to focus
135
        :return: metadata access with mapping semantics, to be treated as read-only
136
        """
137
        # FUTURE deprecate this
138
        if isinstance(info_or_uuid, str):
×
139
            uuid = UUID(info_or_uuid)
×
140
        elif not isinstance(info_or_uuid, UUID):
×
141
            uuid = info_or_uuid[Info.UUID]
×
142
        else:
143
            uuid = info_or_uuid
×
144

145
        prod = self._product_with_uuid(None, uuid)
×
146
        if not prod:  # then it hasn't had its metadata scraped
×
147
            LOG.error("no info available for UUID {}".format(info_or_uuid))
×
148
            LOG.error("known products: {}".format(repr(self._all_product_uuids())))
×
149
            return None
×
150
        kind = prod.info[Info.KIND]
×
151
        native_content = self._product_native_content(None, prod=prod, uuid=uuid, kind=kind)
×
152
        if native_content is not None:
×
153
            # FUTURE: this is especially saddening; upgrade to finer grained
154
            # query and/or deprecate .get_info
155
            # once upon a time...
156
            # our old model was that product == content and shares a UUID with
157
            # the dataset if content is available, we want to provide native
158
            # content metadata along with the product metadata
159
            # specifically a lot of client code assumes that resource
160
            # == product == content and
161
            # that singular navigation (e.g. cell_size) is norm
162
            # FIXME DEBUG <- since commit 3576ff0122bd534f83422ce19479d40b7dc9e5b0
163
            assert kind in [Kind.LINES, Kind.POINTS] or native_content.info[Info.CELL_WIDTH] is not None  # nosec B101
×
164
            return frozendict(ChainMap(native_content.info, prod.info))
×
165
        # mapping semantics for database fields, as well as key-value fields;
166
        # flatten to one namespace and read-only
167
        return frozendict(prod.info)
×
168

169
    def purge_content_for_product_uuids(self, uuids: list, also_products=False):
2✔
170
        pass
×
171

172
    def close(self):
2✔
UNCOV
173
        pass
×
174

175
    def bgnd_task_complete(self):
2✔
176
        pass
×
177

178
    def get_metadata(self, uuid_or_path):
2✔
179
        if isinstance(uuid_or_path, UUID):
×
180
            return self.get_info(uuid_or_path)  # get product metadata
×
181
        else:
182
            return None
×
183

184
    def collect_product_metadata_for_paths(
2✔
185
        self, paths: list, **importer_kwargs
186
    ) -> Generator[Tuple[int, frozendict], None, None]:
187
        """Start loading URI data into the workspace asynchronously.
188

189
        Args:
190
            paths (list): String paths to open and get metadata for
191
            **importer_kwargs: Keyword arguments to pass to the lower-level
192
                importer class.
193

194
        Returns: sequence of read-only info dictionaries
195

196
        """
197
        importers = []
×
198
        num_products = 0
×
199
        if "reader" not in importer_kwargs:
×
200
            # If there is no reader in the importer_kwargs then the SatPy Import can't be used
201
            return None
×
202

203
        # Pass paths to SatPy importer and see what happens
204
        if paths:
×
205
            if "reader" not in importer_kwargs:
×
206
                raise NotImplementedError("Reader discovery is not " "currently implemented in " "the satpy importer.")
×
207
            if "scenes" in importer_kwargs:
×
208
                # another component already created the satpy scenes, use those
209
                scenes = importer_kwargs.pop("scenes")
×
210
                scenes = scenes.items()
×
211
            else:
212
                scenes = [(paths, None)]
×
213
            for paths, scene in scenes:
×
214
                imp = SatpyImporter
×
215
                these_kwargs = importer_kwargs.copy()
×
216
                these_kwargs["scene"] = scene
×
217
                hauler = imp(paths, database_session=None, workspace_cwd=self.cache_dir, **these_kwargs)
×
218
                hauler.merge_resources()
×
219
                importers.append(hauler)
×
220
                num_products += hauler.num_products
×
221

222
        for hauler in importers:
×
223
            for prod in hauler.merge_products():
×
224
                assert prod is not None  # nosec B101
×
225
                # add to-be-imported filenames to check for possible merge targets but
226
                # do not include these filenames in the product info
227
                extended_prod_info = dict(prod.info)
×
228
                extended_prod_info["paths"] = hauler.filenames
×
229
                zult = frozendict(extended_prod_info)
×
230
                # merge the product into our database session, since it may
231
                # belong to import_session
232
                # self._S.merge(prod)
233
                self.products[prod.uuid] = prod
×
234
                # LOG.debug('yielding product metadata for {}'.format(
235
                #     zult.get(Info.DISPLAY_NAME, '?? unknown name ??')))
236
                yield num_products, zult
×
237

238
    def import_product_content(
2✔
239
        self,
240
        uuid: UUID,
241
        prod: Optional[Product] = None,
242
        allow_cache=True,
243
        merge_target_uuid: Optional[UUID] = None,
244
        **importer_kwargs,
245
    ) -> np.memmap:
246
        if prod is None and uuid is not None:
×
247
            prod = self._product_with_uuid(None, uuid)
×
248
        assert prod  # nosec B101 # suppress mypy [union-attr]
×
249

250
        if merge_target_uuid:
×
251
            merge_target = self._product_with_uuid(None, merge_target_uuid)
×
252
            importer_kwargs["merge_target"] = merge_target
×
253
            self.products.pop(uuid, None)
×
254
        else:
255
            importer_kwargs["merge_target"] = None
×
256

257
        self.set_product_state_flag(prod.uuid, State.ARRIVING)
×
258
        default_prod_kind = prod.info[Info.KIND]
×
259

260
        if merge_target_uuid and len(prod.content):
×
261
            LOG.info("product already has content available, using that " "rather than re-importing")
×
262
            ovc = self._product_overview_content(None, uuid=uuid, kind=default_prod_kind)
×
263
            assert ovc is not None  # nosec B101
×
264
            arrays = self._cached_arrays_for_content(ovc)
×
265
            return arrays.data
×
266

267
        truck = aImporter.from_product(prod, workspace_cwd=self.cache_dir, database_session=None, **importer_kwargs)
×
268
        if not truck:
×
269
            # aImporter.from_product() didn't return an Importer instance
270
            # since all files represent data granules, which are already
271
            # loaded and merged into existing datasets.
272
            # Thus: nothing to do.
273
            return None
×
274
        metadata = prod.info
×
275
        name = metadata[Info.SHORT_NAME]
×
276

277
        gen = truck.begin_import_products(prod)
×
278
        nupd = 0
×
279
        for update in gen:
×
280
            nupd += 1
×
281
            # we're now incrementally reading the input file
282
            # data updates are coming back to us (eventually asynchronously)
283
            # Content is in the metadatabase and being updated + committed,
284
            # including sparsity and coverage arrays
285
            if update.data is not None:
×
286
                # data = update.data
287
                LOG.info("{} {}: {:.01f}%".format(name, update.stage_desc, update.completion * 100.0))
×
288
            if update.content is not None:
×
289
                self.contents[update.uuid] = update.content
×
290
        LOG.debug("received {} updates during import".format(nupd))
×
291
        self._clear_product_state_flag(prod.uuid, State.ARRIVING)
×
292

293
        # make an ActiveContent object from the Content, now that we've imported it
294
        ac = self._overview_content_for_uuid(
×
295
            merge_target_uuid if merge_target_uuid else prod.uuid, kind=default_prod_kind
296
        )
297
        if ac is None:
×
298
            return None
×
299
        return ac.data
×
300

301
    def find_merge_target(self, uuid: UUID, paths, info) -> Optional[Product]:
2✔
302
        """
303
        Try to find an existing product where the to-be-imported files could
304
        be merged into.
305

306
        :param uuid: uuid of the product which is about to be imported and
307
                     might be merged with an existing product
308
        :param paths: the paths which should be imported or merged
309
        :param info: metadata for the to-be-imported product
310
        :return: the existing product to merge new content into or None if no
311
                 existing product is compatible
312
        """
313
        reader = info["reader"]
×
314
        group_keys = config.get(f"data_reading.{reader}.group_keys", None)
×
315
        for existing_prod in self.products.values():
×
316
            # exclude all products which are incomplete (products which are imported right now)
317
            # and products with different kind or parameter
318
            # TODO: when loading granules without resampling after granules
319
            #  of same FAMILY have been loaded *with* resampling already,
320
            #  merging must be prevented.
321
            #  As an attempt to achieve this the SHAPE size comparison checks,
322
            #  whether they are compatible for array broadcasting, but that is
323
            #  only a necessary, but not sufficient condition.
324
            #  Idea: modify the FAMILY info by adding grid information?
325
            if (
×
326
                not existing_prod.content
327
                or reader != existing_prod.info["reader"]
328
                or info[Info.SHAPE][0] > existing_prod.info[Info.SHAPE][0]
329
                or info[Info.SHAPE][1] != existing_prod.info[Info.SHAPE][1]
330
                or info[Info.FAMILY] != existing_prod.info[Info.FAMILY]
331
            ):
332
                continue
×
333

334
            # if to-be-imported product seem to be compatible with an existing product check
335
            # if satpy would group together the to-be-imported files and the already loaded files in
336
            # the existing merge candidate
337
            all_files = set(existing_prod.content[0].source_files) if existing_prod.content[0] else set()
×
338
            all_files |= set(paths)
×
339
            grouped_files = satpy.readers.group_files(all_files, reader=reader, group_keys=group_keys)
×
340
            if (
×
341
                len(grouped_files) == 1
342
                and len(grouped_files[0]) == 1
343
                and reader in grouped_files[0]
344
                and len(all_files) == len(grouped_files[0][reader])
345
            ):
346
                return existing_prod
×
347
        return None
×
348

349
    def _create_product_from_array(
2✔
350
        self, info: Mapping, data, namespace=None, codeblock=None
351
    ) -> Tuple[UUID, Optional[frozendict], np.memmap]:
352
        """
353
        Puts created image array into resp. data structures within workspace and returns
354
        uuid, updated info, as well as the memmap of the created array.
355

356
        Side effects include:
357
            Write np.memmap to disk for later retrieval by workspace. Also updates metadata Product
358
            object by path to .image memmap file and adds created Content to workspace's `contents`.
359
            Finally, imports product into workspace.
360

361
        Args:
362
            info: mapping of key-value metadata for new product
363
            data: ndarray with content to store, typically 2D float32
364
            namespace: {variable: uuid, } for calculation of this data
365
            codeblock: text, code to run to recalculate this data within namespace
366

367
        Returns:
368
            uuid, info, data: uuid of the new product, its official read-only metadata, and cached
369
            content ndarray
370
        """
371
        if Info.UUID not in info:
×
372
            raise ValueError("currently require an Info.UUID be included in product")
×
373
        parms = dict(info)
×
374
        now = datetime.utcnow()
×
375
        parms.update(
×
376
            dict(
377
                atime=now,
378
                mtime=now,
379
            )
380
        )
381
        P = Product.from_info(parms, symbols=namespace, codeblock=codeblock)
×
382
        uuid = P.uuid
×
383
        # FUTURE: add expression and namespace information, which would require additional parameters
384
        ws_filename = "{}.image".format(str(uuid))
×
385
        ws_path = os.path.join(self.cache_dir, ws_filename)
×
386
        # Write memmap to disk, for later reference by workspace
387
        with open(ws_path, "wb+") as fp:
×
388
            mm = np.memmap(fp, dtype=data.dtype, shape=data.shape, mode="w+")
×
389
            mm[:] = data[:]
×
390
        # Update metadata to contain path to cached memmap .image file
391
        parms.update(
×
392
            dict(
393
                lod=ContentImage.LOD_OVERVIEW,
394
                path=ws_filename,
395
                dtype=str(data.dtype),
396
                proj4=info[Info.PROJ],
397
                resolution=min(info[Info.CELL_WIDTH], info[Info.CELL_HEIGHT]),
398
            )
399
        )
400
        rcls = dict(zip(("rows", "cols", "levels"), data.shape))
×
401
        parms.update(rcls)
×
402
        LOG.debug("about to create Content with this: {}".format(repr(parms)))
×
403

404
        C = ContentImage.from_info(parms, only_fields=True)
×
405
        P.content.append(C)
×
406

407
        self.contents[uuid] = C
×
408
        self.products[uuid] = P
×
409
        # activate the content we just loaded into the workspace
410
        overview_data = self._overview_content_for_uuid(uuid)
×
411

412
        return uuid, self.get_info(uuid), overview_data
×
413

414
    def _bgnd_remove(self, uuid: UUID):
2✔
415
        from uwsift.queue import TASK_DOING, TASK_PROGRESS
×
416

417
        yield {TASK_DOING: "purging memory", TASK_PROGRESS: 0.5}
×
418
        LOG.debug(f"Products before deletion: {list(self.products.keys())}")
×
419
        LOG.debug(f"Contents before deletion: {list(self.contents.keys())}")
×
420
        LOG.debug(f"Active Content before deletion: {list(self._available.keys())}")
×
421
        self._deactivate_content_for_product(self._product_with_uuid(None, uuid))
×
422
        self.contents.pop(uuid, None)
×
423
        self.products.pop(uuid, None)
×
424
        LOG.debug(f"Products after deletion: {list(self.products.keys())}")
×
425
        LOG.debug(f"Contents after deletion: {list(self.contents.keys())}")
×
426
        LOG.debug(f"Active Content after deletion: {list(self._available.keys())}")
×
427
        yield {TASK_DOING: "purging memory", TASK_PROGRESS: 1.0}
×
428

429
    def get_content(self, info_or_uuid, lod=None, kind: Kind = Kind.IMAGE) -> Optional[np.memmap]:
2✔
430
        """
431
        By default, get the best-available (closest to native)
432
        np.ndarray-compatible view of the full dataset
433
        :param info_or_uuid: existing datasetinfo dictionary, or its UUID
434
        :param lod: desired level of detail to focus  (0 for overview)
435
        :param kind: kind of the data referenced by info_or_uuid
436
        :return:
437
        """
438
        if info_or_uuid is None:
×
439
            return None
×
440
        elif isinstance(info_or_uuid, UUID):
×
441
            uuid = info_or_uuid
×
442
        elif isinstance(info_or_uuid, str):
×
443
            uuid = UUID(info_or_uuid)
×
444
        else:
445
            uuid = info_or_uuid[Info.UUID]
×
446

447
        # The current implementation, where self.contents is a dict, cannot
448
        # support multiple contents per Product/UUID which seems to be prepared
449
        # in the original implementation, now found in
450
        # CachingWorkspace.get_content().
451
        # TODO(FUTURE): Update this in case CachingWorkspace is dropped and/or
452
        #  multiple contents per Product/UUID are implemented, see below.
453
        content = self.contents.get(uuid)
×
454
        content = content if content and content.info.get(Info.KIND, Kind.IMAGE) == kind else None
×
455

456
        if content is None:
×
457
            raise AssertionError("no content in workspace for {}, must re-import".format(uuid))
×
458

459
        # FIXME: find the content for the requested LOD, then return its
460
        #  ActiveContent - or attach one
461
        #  for now, just work with assumption of one product one content
462
        active_content = self._cached_arrays_for_content(content)
×
463
        return active_content.data
×
464

465
    def _deactivate_content_for_product(self, p: Optional[Product]):
2✔
466
        if p is None:
×
467
            return
×
468
        for c in p.content:
×
469
            self._available.pop(c.uuid, None)
×
470

471
    def _get_active_content_by_uuid(self, uuid: UUID) -> Optional[ActiveContent]:
2✔
472
        return self._available.get(uuid)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc