• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 5879585527

16 Aug 2023 01:42PM UTC coverage: 82.998% (+0.7%) from 82.34%
5879585527

Pull #17

github

web-flow
Spell/typo correction

Update activefires_pp/post_processing.py

Co-authored-by: Martin Raspaud <martin.raspaud@smhi.se>
Pull Request #17: Refactor and add sweref99 output

286 of 439 branches covered (65.15%)

179 of 203 new or added lines in 7 files covered. (88.18%)

1 existing line in 1 file now uncovered.

1982 of 2388 relevant lines covered (83.0%)

3.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.99
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbro
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33

34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.geojson_utils import store_geojson
4✔
46
from activefires_pp.geojson_utils import geojson_feature_collection_from_detections
4✔
47
from activefires_pp.geojson_utils import map_coordinates_in_feature_collection
4✔
48

49
from activefires_pp.utils import datetime_utc2local
4✔
50
from activefires_pp.utils import get_local_timezone_offset
4✔
51
from activefires_pp.config import read_config
4✔
52
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
53

54
# M-band output:
55
# column 1: latitude of fire pixel (degrees)
56
# column 2: longitude of fire pixel (degrees)
57
# column 3: M13 brightness temperature of fire pixel (K)
58
# column 4: Along-scan fire pixel resolution (km)
59
# column 5: Along-track fire pixel resolution (km)
60
# column 6: detection confidence (%)
61
#           7=low, 8=nominal, 9=high (Imager Resolution)
62
#           0-100 % (Moderate Resolution)
63
# column 7: fire radiative power (MW)
64
# I-band output:
65
# column 1: latitude of fire pixel (degrees)
66
# column 2: longitude of fire pixel (degrees)
67
# column 3: I04 brightness temperature of fire pixel (K)
68
# column 4: Along-scan fire pixel resolution (km)
69
# column 5: Along-track fire pixel resolution (km)
70
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
71
# column 7: fire radiative power (MW)
72
#
73
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
74

75
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
76

77

78
logger = logging.getLogger(__name__)
4✔
79
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
80

81

82
class ActiveFiresShapefileFiltering(object):
4✔
83
    """Reading, filtering and writing Active Fire detections.
84

85
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
86
    Filtering for static and false alarms, and/or simply on geographical regions.
87
    Data is stored in geojson format.
88
    """
89

90
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
91
        """Initialize the ActiveFiresShapefileFiltering class."""
92
        self.input_filepath = filepath
4✔
93
        self._afdata = afdata
4✔
94
        if afdata is None:
4✔
95
            self.metadata = {}
4✔
96
        else:
97
            self.metadata = afdata.attrs
4✔
98

99
        self.timezone = timezone
4✔
100
        self.platform_name = platform_name
4✔
101

102
    def get_af_data(self, filepattern=None, localtime=True):
4✔
103
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
104
        if self._afdata is not None:
4✔
105
            # Make sure the attrs are populated with metadata instance attribute
106
            self._afdata.attrs.update(self.metadata)
4✔
107
            return self._afdata
4✔
108

109
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
110
            # FIXME! Better to raise an exception!?
111
            return self._afdata
×
112

113
        if not filepattern:
4!
114
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
115

116
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
117
        self._afdata = _read_data(self.input_filepath)
4✔
118
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
119

120
        return self._afdata
4✔
121

122
    def _get_metadata_from_filename(self, infile_pattern):
4✔
123
        """From the filename retrieve the metadata such as satellite and sensing time."""
124
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
125

126
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
127
        """Add start and end time to active fires data."""
128
        if localtime:
4✔
129
            logger.info("Convert to local time zone!")
4✔
130
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
131
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
132
        else:
133
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
134
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
135

136
        starttime = starttime.replace(tzinfo=None)
4✔
137
        endtime = endtime.replace(tzinfo=None)
4✔
138

139
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
140
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
141

142
        logger.info('Start and end times: %s %s',
4✔
143
                    str(self._afdata['starttime'][0]),
144
                    str(self._afdata['endtime'][0]))
145

146
    def _apply_timezone_offset(self, obstime):
4✔
147
        """Apply the time zone offset to the datetime objects."""
148
        obstime_offset = get_local_timezone_offset(self.timezone)
×
149
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
150
                         len(self._afdata)).astype(np.datetime64)
151

152
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
153
        """Remove fires outside National borders or filter out potential false detections.
154

155
        If *inside* is True the filtering will keep those detections that are inside the polygon.
156
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
157
        """
158
        detections = self._afdata
4✔
159

160
        lons = detections.longitude.values
4✔
161
        lats = detections.latitude.values
4✔
162

163
        toc = time.time()
4✔
164
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
165
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
166

167
        self._afdata = detections[points_inside == inside]
4✔
168

169
        if len(self._afdata) == 0:
4!
170
            logger.debug("No fires after filtering on Polygon...")
×
171
        else:
172
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
173

174
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
175
        """Get the regional filter masks from the shapefile."""
176
        detections = self._afdata
×
177

178
        lons = detections.longitude.values
×
179
        lats = detections.latitude.values
×
180

181
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
182
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
183
        shape_geom = ShapeGeometry(shapefile, globstr)
×
184
        shape_geom.load()
×
185

186
        p__ = pyproj.Proj(shape_geom.proj4str)
×
187
        metersx, metersy = p__(lons, lats)
×
188
        points = np.vstack([metersx, metersy]).T
×
189

190
        regional_masks = {}
×
191

192
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
193
            test_omr = attr['Testomr']
×
194
            all_inside_test_omr = False
×
195
            some_inside_test_omr = False
×
196
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
197

198
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
199

200
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
201
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
202
            else:
203
                shape = geometry
×
204
                pth = Path(shape.exterior.coords)
×
205
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
206

207
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
208
                all_inside_test_omr = True
×
209
                some_inside_test_omr = True
×
210
                logger.debug("All points inside test area!")
×
211
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
212
                some_inside_test_omr = True
×
213
                logger.debug("Some points inside test area!")
×
214

215
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
216
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
217

218
        return regional_masks
×
219

220

221
def _read_data(filepath):
4✔
222
    """Read the AF data."""
223
    with open(filepath, 'r') as fpt:
×
224
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
225

226

227
def get_metadata_from_filename(infile_pattern, filepath):
4✔
228
    """From the filename and its pattern get basic metadata of the satellite observations."""
229
    p__ = Parser(infile_pattern)
4✔
230
    fname = os.path.basename(filepath)
4✔
231
    try:
4✔
232
        res = p__.parse(fname)
4✔
233
    except ValueError:
×
234
        # Do something!
235
        return None
×
236

237
    # Fix the end time:
238
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
239
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
240
                       res['end_hour'].second)
241
    if endtime < res['start_time']:
4!
242
        endtime = endtime + timedelta(days=1)
×
243

244
    res['end_time'] = endtime
4✔
245

246
    return res
4✔
247

248

249
def store(output_filename, detections):
4✔
250
    """Store the filtered AF detections on disk."""
251
    if len(detections) > 0:
×
252
        detections.to_csv(output_filename, index=False)
×
253
        return output_filename
×
254
    else:
255
        logger.debug("No detections to save!")
×
256
        return None
×
257

258

259
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
260
    """Get mask for points from a shapely Multipolygon."""
261
    shape = geometry.geoms[0]
4✔
262
    pth = Path(shape.exterior.coords)
4✔
263
    mask = pth.contains_points(points)
4✔
264

265
    if sum(mask) == len(points):
4!
266
        return mask
×
267

268
    constituent_part = geometry.geoms[start_idx:]
4✔
269
    for shape in constituent_part.geoms:
4✔
270
        pth = Path(shape.exterior.coords)
4✔
271
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
272
        if sum(mask) == len(points):
4✔
273
            break
4✔
274

275
    return mask
4✔
276

277

278
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
279
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
280
    lons, lats = lonlats
4✔
281
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
282
    shape_geom = ShapeGeometry(shapefile)
4✔
283
    shape_geom.load()
4✔
284

285
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
286

287
    # There is only one geometry/multi-polygon!
288
    geometry = shape_geom.geometries[0]
4✔
289

290
    metersx, metersy = p__(lons, lats)
4✔
291
    points = np.vstack([metersx, metersy]).T
4✔
292

293
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
294

295

296
class ActiveFiresPostprocessing(Thread):
4✔
297
    """The active fires post processor."""
298

299
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
300
        """Initialize the active fires post processor class."""
301
        super().__init__()
4✔
302
        self.shp_borders = shp_borders
4✔
303
        self.shp_filtermask = shp_mask
4✔
304

305
        self.regional_filtermask = regional_filtermask
4✔
306
        self.configfile = configfile
4✔
307
        self.options = {}
4✔
308

309
        config = read_config(self.configfile)
4✔
310
        self._set_options_from_config(config)
4✔
311

312
        self.host = socket.gethostname()
4✔
313
        self.timezone = self.options.get('timezone', 'GMT')
4✔
314

315
        self.input_topic = self.options['subscribe_topics'][0]
4✔
316
        self.output_topic = self.options['publish_topic']
4✔
317
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
318
        self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
4✔
319
        self.outfile_pattern_national_sweref99 = self.options.get('geojson_file_pattern_national_sweref99')
4✔
320
        self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')
4✔
321
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
322
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
323

324
        frmt = self.options['regional_shapefiles_format']
4✔
325
        self.regional_shapefiles_globstr = globify(frmt)
4✔
326

327
        self._fire_detection_id = None
4✔
328
        self._initialize_fire_detection_id()
4✔
329

330
        self.listener = None
4✔
331
        self.publisher = None
4✔
332
        self.loop = False
4✔
333
        self._setup_and_start_communication()
4✔
334

335
    def _setup_and_start_communication(self):
4✔
336
        """Set up the Posttroll communication and start the publisher."""
337
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
338
        now = datetime_utc2local(datetime.now(), self.timezone)
×
339
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
340

341
        self._check_borders_shapes_exists()
×
342

343
        self.listener = ListenerContainer(topics=[self.input_topic])
×
344
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
345
        self.publisher.start()
×
346
        self.loop = True
×
347
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
348

349
    def _set_options_from_config(self, config):
4✔
350
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
351
        for item in config:
4✔
352
            if not isinstance(config[item], dict):
4!
353
                self.options[item] = config[item]
4✔
354

355
        if isinstance(self.options.get('subscribe_topics'), str):
4!
356
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
357
            for item in subscribe_topics:
4✔
358
                if len(item) == 0:
4!
359
                    subscribe_topics.remove(item)
×
360
            self.options['subscribe_topics'] = subscribe_topics
4✔
361

362
        if isinstance(self.options.get('publish_topics'), str):
4!
363
            publish_topics = self.options.get('publish_topics').split(',')
×
364
            for item in publish_topics:
×
365
                if len(item) == 0:
×
366
                    publish_topics.remove(item)
×
367
            self.options['publish_topics'] = publish_topics
×
368

369
    def signal_shutdown(self, *args, **kwargs):
4✔
370
        """Shutdown the Active Fires postprocessing."""
371
        self.close()
×
372

373
    def check_incoming_message_and_get_filename(self, msg):
4✔
374
        """Check the message content and return filename if okay."""
375
        if msg.type not in ['file', 'collection', 'dataset']:
4!
376
            logger.debug("Message type not supported: %s", str(msg.type))
×
377
            return None
×
378

379
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
380
        if not os.path.exists(filename):
4✔
381
            logger.warning("File does not exist: %s", filename)
4✔
382
            return None
4✔
383

384
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
385
        if not file_ok:
4✔
386
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
387
            for output_msg in output_messages:
4✔
388
                logger.debug("Sending message: %s", str(output_msg))
4✔
389
                self.publisher.send(str(output_msg))
4✔
390
            return None
4✔
391

392
        return filename
4✔
393

394
    def _national_save_and_publish(self, feature_collection, ndata, af_shapeff, msg, sweref99=False):
4✔
395
        """Take the feature collection and store the results in a Geojson file and publish."""
NEW
396
        if feature_collection is None:
×
NEW
397
            logger.info("No geojson file created, number of fires after filtering = %d", ndata)
×
NEW
398
            output_messages = self._generate_no_fires_messages(msg,
×
399
                                                               'No true fire detections inside National borders')  # noqa
NEW
400
            return
×
401

NEW
402
        fmda = af_shapeff.metadata
×
NEW
403
        if sweref99:
×
NEW
404
            pout = Parser(self.outfile_pattern_national_sweref99)
×
405
        else:
NEW
406
            pout = Parser(self.outfile_pattern_national)
×
407

NEW
408
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
×
NEW
409
        logger.debug("Output file path = %s", out_filepath)
×
410

NEW
411
        store_geojson(out_filepath, feature_collection)
×
NEW
412
        output_messages = self.get_output_messages(out_filepath, msg, ndata, sweref99=sweref99)
×
413

NEW
414
        for output_msg in output_messages:
×
NEW
415
            if output_msg:
×
NEW
416
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
417
                self.publisher.send(str(output_msg))
×
418

419
    def do_postprocessing_on_message(self, msg, filename):
4✔
420
        """Do the fires post processing on a message."""
421
        platform_name = msg.data.get('platform_name')
×
422
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
423
                                                   timezone=self.timezone)
424
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
425
        if len(afdata) == 0:
×
426
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
427
            for output_msg in output_messages:
×
428
                logger.debug("Sending message: %s", str(output_msg))
×
429
                self.publisher.send(str(output_msg))
×
430
            return
×
431

432
        afdata = self.fires_filtering(msg, af_shapeff)
×
433
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
434
        if len(afdata) == 0:
×
435
            logger.debug("No fires - so no regional filtering to be done!")
×
436
            return
×
437

438
        # It is here that we should add a uniue day-ID to each of the detections!
439
        afdata = self.add_unique_day_id(afdata)
×
440
        self.save_id_to_file()
×
441

442
        # 1) Create geojson feature collection
443
        # 2) Dump geojson data to disk
444

UNCOV
445
        feature_collection = geojson_feature_collection_from_detections(afdata,
×
446
                                                                        platform_name=af_shapeff.platform_name)
447

NEW
448
        self._national_save_and_publish(feature_collection, len(afdata), af_shapeff, msg)
×
449

NEW
450
        sweref99_fc = map_coordinates_in_feature_collection(feature_collection, '3006')
×
NEW
451
        self._national_save_and_publish(sweref99_fc, len(afdata), af_shapeff, msg, sweref99=True)
×
452

453
        # Do the regional filtering now:
454
        if not self.regional_filtermask:
×
455
            logger.info("No regional filtering is attempted.")
×
456
            return
×
457

458
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
459
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
460
                                                   timezone=self.timezone)
461
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
462
                                                             globstr=self.regional_shapefiles_globstr)
463
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
464
        for region_msg in regional_messages:
×
465
            logger.debug("Sending message: %s", str(region_msg))
×
466
            self.publisher.send(str(region_msg))
×
467

468
    def run(self):
4✔
469
        """Run the AF post processing."""
470
        while self.loop:
×
471
            try:
×
472
                msg = self.listener.output_queue.get(timeout=1)
×
473
                logger.debug("Message: %s", str(msg.data))
×
474
            except Empty:
×
475
                continue
×
476
            else:
477
                filename = self.check_incoming_message_and_get_filename(msg)
×
478
                if not filename:
×
479
                    continue
×
480

481
                self.do_postprocessing_on_message(msg, filename)
×
482

483
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
484
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
485
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
486

487
        afdata = afsff_obj.get_af_data()
4✔
488
        fmda = afsff_obj.metadata
4✔
489

490
        fmda['platform'] = afsff_obj.platform_name
4✔
491

492
        pout = Parser(self.outfile_pattern_regional)
4✔
493

494
        output_messages = []
4✔
495
        regions_with_detections = 0
4✔
496
        for region_name in regional_fmask:
4✔
497
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
498
                continue
4✔
499

500
            regions_with_detections = regions_with_detections + 1
4✔
501
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
502

503
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
504
            logger.debug("Output file path = %s", out_filepath)
4✔
505
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
506

507
            # filepath = store_geojson(out_filepath, data_in_region, platform_name=fmda['platform'])
508
            feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
509
                                                                            platform_name=fmda['platform'])
510
            if feature_collection is None:
4!
511
                logger.warning("Something wrong happended storing regional " +
×
512
                               "data to Geojson - area: {name}".format(name=str(region_name)))
513
                continue
×
514

515
            store_geojson(out_filepath, feature_collection)
4✔
516

517
            outmsg = self._generate_output_message(out_filepath, msg, regional_fmask[region_name])
4✔
518
            output_messages.append(outmsg)
4✔
519
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
520

521
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
522
                     "detections on this granule: %s" % str(regions_with_detections))
523
        return output_messages
4✔
524

525
    def fires_filtering(self, msg, af_shapeff):
4✔
526
        """Read Active Fire data and perform spatial filtering removing false detections.
527

528
        Do the national filtering first, and then filter out potential false
529
        detections by the special mask for that.
530

531
        """
532
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
533

534
        fmda = af_shapeff.metadata
4✔
535
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
536

537
        pout = Parser(self.outfile_pattern_national)
4✔
538
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
539
        logger.debug("Output file path = %s", out_filepath)
4✔
540

541
        # National filtering:
542
        af_shapeff.fires_filtering(self.shp_borders)
4✔
543

544
        # Metadata should be transfered here!
545
        afdata_ff = af_shapeff.get_af_data()
4✔
546

547
        if len(afdata_ff) > 0:
4!
548
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
549
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
550
            afdata_ff = af_shapeff.get_af_data()
4✔
551
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
552

553
        return afdata_ff
4✔
554

555
    def get_output_messages(self, filepath, msg, number_of_data, sweref99=False):
4✔
556
        """Generate the adequate output message(s) depending on if an output file was created or not."""
557
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
4✔
558
        return [self._generate_output_message(filepath, msg, sweref99=sweref99)]
4✔
559

560
    def _generate_output_message(self, filepath, input_msg, region=None, sweref99=False):
4✔
561
        """Create the output message to publish."""
562
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
563
        to_send = prepare_posttroll_message(input_msg, region)
4✔
564
        to_send['uri'] = ('ssh://%s/%s' % (self.host, filepath))
4✔
565
        to_send['uid'] = os.path.basename(filepath)
4✔
566
        to_send['type'] = 'GEOJSON-filtered'
4✔
567
        to_send['format'] = 'geojson'
4✔
568
        if sweref99:
4✔
569
            to_send['product'] = 'afimg_sweref99'
4✔
570
        else:
571
            to_send['product'] = 'afimg'
4✔
572
        pubmsg = Message(output_topic, 'file', to_send)
4✔
573
        return pubmsg
4✔
574

575
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
576
        """Create the output messages to publish."""
577
        to_send = prepare_posttroll_message(input_msg)
4✔
578
        to_send['info'] = msg_string
4✔
579
        publish_messages = []
4✔
580
        for ext in ['National', 'Regional']:
4✔
581
            topic = self.output_topic + '/' + ext
4✔
582
            publish_messages.append(Message(topic, 'info', to_send))
4✔
583

584
        return publish_messages
4✔
585

586
    def _check_borders_shapes_exists(self):
4✔
587
        """Check that the national borders shapefile exists on disk."""
588
        if not os.path.exists(self.shp_borders):
4✔
589
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
590

591
    def _initialize_fire_detection_id(self):
4✔
592
        """Initialize the fire detection ID."""
593
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
594
            self._fire_detection_id = self.get_id_from_file()
4✔
595
        else:
596
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
597

598
    def update_fire_detection_id(self):
4✔
599
        """Update the fire detection ID registry."""
600
        now = datetime.utcnow()
4✔
601
        tdelta = now - self._fire_detection_id['date']
4✔
602
        if tdelta.total_seconds() > 24*3600:
4✔
603
            self._initialize_fire_detection_id()
4✔
604
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
605
            self._initialize_fire_detection_id()
4✔
606

607
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
608

609
    def save_id_to_file(self):
4✔
610
        """Save the (current) detection id on disk.
611

612
        It is assumed that the user permissions are so that a file can actually
613
        be written to disk here!
614
        """
615
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
616
            id_ = self._create_id_string()
4✔
617
            fpt.write(id_)
4✔
618

619
    def get_id_from_file(self):
4✔
620
        """Read the latest stored detection id string from disk and convert to internal format."""
621
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
622
            idstr = fpt.read()
4✔
623

624
        return self._get_id_from_string(idstr)
4✔
625

626
    def _get_id_from_string(self, idstr):
4✔
627
        """Get the detection id from string."""
628
        datestr, counter = idstr.split('-')
4✔
629
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
630
                'counter': int(counter)}
631

632
    def _create_id_string(self):
4✔
633
        """From the internal fire detection id create the id string to be exposed to the user."""
634
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
635
                '-' + str(self._fire_detection_id['counter']))
636

637
    def add_unique_day_id(self, afdata):
4✔
638
        """Add a unique detection id - date + a running number for the day."""
639
        # Add id's to the detections:
640
        id_list = []
4✔
641
        for _i in range(len(afdata)):
4✔
642
            self.update_fire_detection_id()
4✔
643
            id_ = self._create_id_string()
4✔
644
            id_list.append(id_)
4✔
645

646
        afdata['detection_id'] = id_list
4✔
647
        return afdata
4✔
648

649
    def close(self):
4✔
650
        """Shutdown the Active Fires postprocessing."""
651
        logger.info('Terminating Active Fires post processing.')
×
652
        self.loop = False
×
653
        logger.info('Dumping the latest detection id to disk: %s', str(self.filepath_detection_id_cache))
×
654
        self.save_id_to_file()
×
655
        try:
×
656
            self.listener.stop()
×
657
        except Exception:
×
658
            logger.exception("Couldn't stop listener.")
×
659
        if self.publisher:
×
660
            try:
×
661
                self.publisher.stop()
×
662
            except Exception:
×
663
                logger.exception("Couldn't stop publisher.")
×
664

665

666
def check_file_type_okay(file_type):
4✔
667
    """Check if the file is of the correct type."""
668
    if file_type not in ['txt', 'TXT']:
4✔
669
        logger.info('File type not txt: %s', str(file_type))
4✔
670
        return False
4✔
671
    return True
4✔
672

673

674
def get_filename_from_uri(uri):
4✔
675
    """Get the file name from the uri given."""
676
    logger.info('File uri: %s', str(uri))
4✔
677
    url = urlparse(uri)
4✔
678
    return url.path
4✔
679

680

681
def generate_posttroll_topic(output_topic, region=None):
4✔
682
    """Create the topic for the posttroll message to publish."""
683
    if region:
4✔
684
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
685
    else:
686
        output_topic = output_topic + '/National'
4✔
687

688
    return output_topic
4✔
689

690

691
def prepare_posttroll_message(input_msg, region=None):
4✔
692
    """Create the basic posttroll-message fields and return."""
693
    to_send = input_msg.data.copy()
4✔
694
    to_send.pop('dataset', None)
4✔
695
    to_send.pop('collection', None)
4✔
696
    to_send.pop('uri', None)
4✔
697
    to_send.pop('uid', None)
4✔
698
    to_send.pop('format', None)
4✔
699
    to_send.pop('type', None)
4✔
700
    # FIXME! Check that the region_name is stored as a unicode string!
701
    if region:
4✔
702
        to_send['region_name'] = region['attributes']['Testomr']
4✔
703
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
704

705
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc