• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 6110862147

07 Sep 2023 02:10PM UTC coverage: 83.593% (+0.6%) from 83.035%
6110862147

push

github

web-flow
Merge pull request #17 from adybbroe/sweref99_projection_output

Refactor and add sweref99 output

288 of 441 branches covered (0.0%)

246 of 269 new or added lines in 9 files covered. (91.45%)

1 existing line in 1 file now uncovered.

2089 of 2499 relevant lines covered (83.59%)

3.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.24
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbroe
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33

34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.utils import datetime_utc2local
4✔
46
from activefires_pp.utils import UnitConverter
4✔
47
from activefires_pp.utils import get_local_timezone_offset
4✔
48
from activefires_pp.config import read_config
4✔
49
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
50

51
from activefires_pp.geojson_utils import store_geojson
4✔
52
from activefires_pp.geojson_utils import geojson_feature_collection_from_detections
4✔
53
from activefires_pp.geojson_utils import map_coordinates_in_feature_collection
4✔
54

55

56
# M-band output:
57
# column 1: latitude of fire pixel (degrees)
58
# column 2: longitude of fire pixel (degrees)
59
# column 3: M13 brightness temperature of fire pixel (K)
60
# column 4: Along-scan fire pixel resolution (km)
61
# column 5: Along-track fire pixel resolution (km)
62
# column 6: detection confidence (%)
63
#           7=low, 8=nominal, 9=high (Imager Resolution)
64
#           0-100 % (Moderate Resolution)
65
# column 7: fire radiative power (MW)
66
# I-band output:
67
# column 1: latitude of fire pixel (degrees)
68
# column 2: longitude of fire pixel (degrees)
69
# column 3: I04 brightness temperature of fire pixel (K)
70
# column 4: Along-scan fire pixel resolution (km)
71
# column 5: Along-track fire pixel resolution (km)
72
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
73
# column 7: fire radiative power (MW)
74
#
75
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
76

77
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
78

79

80
logger = logging.getLogger(__name__)
4✔
81
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
82

83

84
class ActiveFiresShapefileFiltering(object):
4✔
85
    """Reading, filtering and writing Active Fire detections.
86

87
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
88
    Filtering for static and false alarms, and/or simply on geographical regions.
89
    Data is stored in geojson format.
90
    """
91

92
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
93
        """Initialize the ActiveFiresShapefileFiltering class."""
94
        self.input_filepath = filepath
4✔
95
        self._afdata = afdata
4✔
96
        if afdata is None:
4✔
97
            self.metadata = {}
4✔
98
        else:
99
            self.metadata = afdata.attrs
4✔
100

101
        self.timezone = timezone
4✔
102
        self.platform_name = platform_name
4✔
103

104
    def get_af_data(self, filepattern=None, localtime=True):
4✔
105
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
106
        if self._afdata is not None:
4✔
107
            # Make sure the attrs are populated with metadata instance attribute
108
            self._afdata.attrs.update(self.metadata)
4✔
109
            return self._afdata
4✔
110

111
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
112
            # FIXME! Better to raise an exception!?
113
            return self._afdata
×
114

115
        if not filepattern:
4!
116
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
117

118
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
119
        self._afdata = _read_data(self.input_filepath)
4✔
120
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
121

122
        return self._afdata
4✔
123

124
    def _get_metadata_from_filename(self, infile_pattern):
4✔
125
        """From the filename retrieve the metadata such as satellite and sensing time."""
126
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
127

128
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
129
        """Add start and end time to active fires data."""
130
        if localtime:
4✔
131
            logger.info("Convert to local time zone!")
4✔
132
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
133
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
134
        else:
135
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
136
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
137

138
        starttime = starttime.replace(tzinfo=None)
4✔
139
        endtime = endtime.replace(tzinfo=None)
4✔
140

141
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
142
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
143

144
        logger.info('Start and end times: %s %s',
4✔
145
                    str(self._afdata['starttime'][0]),
146
                    str(self._afdata['endtime'][0]))
147

148
    def _apply_timezone_offset(self, obstime):
4✔
149
        """Apply the time zone offset to the datetime objects."""
150
        obstime_offset = get_local_timezone_offset(self.timezone)
×
151
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
152
                         len(self._afdata)).astype(np.datetime64)
153

154
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
155
        """Remove fires outside National borders or filter out potential false detections.
156

157
        If *inside* is True the filtering will keep those detections that are inside the polygon.
158
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
159
        """
160
        detections = self._afdata
4✔
161

162
        lons = detections.longitude.values
4✔
163
        lats = detections.latitude.values
4✔
164

165
        toc = time.time()
4✔
166
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
167
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
168

169
        self._afdata = detections[points_inside == inside]
4✔
170

171
        if len(self._afdata) == 0:
4!
172
            logger.debug("No fires after filtering on Polygon...")
×
173
        else:
174
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
175

176
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
177
        """Get the regional filter masks from the shapefile."""
178
        detections = self._afdata
×
179

180
        lons = detections.longitude.values
×
181
        lats = detections.latitude.values
×
182

183
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
184
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
185
        shape_geom = ShapeGeometry(shapefile, globstr)
×
186
        shape_geom.load()
×
187

188
        p__ = pyproj.Proj(shape_geom.proj4str)
×
189
        metersx, metersy = p__(lons, lats)
×
190
        points = np.vstack([metersx, metersy]).T
×
191

192
        regional_masks = {}
×
193

194
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
195
            test_omr = attr['Testomr']
×
196
            all_inside_test_omr = False
×
197
            some_inside_test_omr = False
×
198
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
199

200
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
201

202
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
203
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
204
            else:
205
                shape = geometry
×
206
                pth = Path(shape.exterior.coords)
×
207
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
208

209
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
210
                all_inside_test_omr = True
×
211
                some_inside_test_omr = True
×
212
                logger.debug("All points inside test area!")
×
213
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
214
                some_inside_test_omr = True
×
215
                logger.debug("Some points inside test area!")
×
216

217
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
218
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
219

220
        return regional_masks
×
221

222

223
def _read_data(filepath):
4✔
224
    """Read the AF data."""
225
    with open(filepath, 'r') as fpt:
×
226
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
227

228

229
def get_metadata_from_filename(infile_pattern, filepath):
4✔
230
    """From the filename and its pattern get basic metadata of the satellite observations."""
231
    p__ = Parser(infile_pattern)
4✔
232
    fname = os.path.basename(filepath)
4✔
233
    try:
4✔
234
        res = p__.parse(fname)
4✔
235
    except ValueError:
×
236
        # Do something!
237
        return None
×
238

239
    # Fix the end time:
240
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
241
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
242
                       res['end_hour'].second)
243
    if endtime < res['start_time']:
4!
244
        endtime = endtime + timedelta(days=1)
×
245

246
    res['end_time'] = endtime
4✔
247

248
    return res
4✔
249

250

251
def store(output_filename, detections):
4✔
252
    """Store the filtered AF detections on disk."""
253
    if len(detections) > 0:
×
254
        detections.to_csv(output_filename, index=False)
×
255
        return output_filename
×
256
    else:
257
        logger.debug("No detections to save!")
×
258
        return None
×
259

260

261
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
262
    """Get mask for points from a shapely Multipolygon."""
263
    shape = geometry.geoms[0]
4✔
264
    pth = Path(shape.exterior.coords)
4✔
265
    mask = pth.contains_points(points)
4✔
266

267
    if sum(mask) == len(points):
4!
268
        return mask
×
269

270
    constituent_part = geometry.geoms[start_idx:]
4✔
271
    for shape in constituent_part.geoms:
4✔
272
        pth = Path(shape.exterior.coords)
4✔
273
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
274
        if sum(mask) == len(points):
4✔
275
            break
4✔
276

277
    return mask
4✔
278

279

280
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
281
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
282
    lons, lats = lonlats
4✔
283
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
284
    shape_geom = ShapeGeometry(shapefile)
4✔
285
    shape_geom.load()
4✔
286

287
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
288

289
    # There is only one geometry/multi-polygon!
290
    geometry = shape_geom.geometries[0]
4✔
291

292
    metersx, metersy = p__(lons, lats)
4✔
293
    points = np.vstack([metersx, metersy]).T
4✔
294

295
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
296

297

298
class ActiveFiresPostprocessing(Thread):
4✔
299
    """The active fires post processor."""
300

301
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
302
        """Initialize the active fires post processor class."""
303
        super().__init__()
4✔
304
        self.shp_borders = shp_borders
4✔
305
        self.shp_filtermask = shp_mask
4✔
306

307
        self.regional_filtermask = regional_filtermask
4✔
308
        self.configfile = configfile
4✔
309
        self.options = {}
4✔
310

311
        config = read_config(self.configfile)
4✔
312
        self._set_options_from_config(config)
4✔
313

314
        self.host = socket.gethostname()
4✔
315
        self.timezone = self.options.get('timezone', 'GMT')
4✔
316

317
        self.input_topic = self.options['subscribe_topics'][0]
4✔
318
        self.output_topic = self.options['publish_topic']
4✔
319
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
320

321
        self.outfile_patterns_national = config.get('output').get('national')
4✔
322
        self.outfile_patterns_regional = config.get('output').get('regional')
4✔
323

324
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
325

326
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
327

328
        frmt = self.options['regional_shapefiles_format']
4✔
329
        self.regional_shapefiles_globstr = globify(frmt)
4✔
330

331
        self._fire_detection_id = None
4✔
332
        self._initialize_fire_detection_id()
4✔
333

334
        self.listener = None
4✔
335
        self.publisher = None
4✔
336
        self.loop = False
4✔
337
        self._setup_and_start_communication()
4✔
338

339
    def _setup_and_start_communication(self):
4✔
340
        """Set up the Posttroll communication and start the publisher."""
341
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
342
        now = datetime_utc2local(datetime.now(), self.timezone)
×
343
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
344

345
        tic = time.time()
×
346
        units = {'temperature': 'degC'}
×
347
        self.unit_converter = UnitConverter(units)
×
348
        logger.debug("Unit conversion initialization with Pint took %f seconds.", time.time() - tic)
×
349

350
        self._check_borders_shapes_exists()
×
351

352
        self.listener = ListenerContainer(topics=[self.input_topic])
×
353
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
354
        self.publisher.start()
×
355
        self.loop = True
×
356
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
357

358
    def _set_options_from_config(self, config):
4✔
359
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
360
        for item in config:
4✔
361
            if not isinstance(config[item], dict):
4✔
362
                self.options[item] = config[item]
4✔
363

364
        if isinstance(self.options.get('subscribe_topics'), str):
4!
365
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
366
            for item in subscribe_topics:
4✔
367
                if len(item) == 0:
4!
368
                    subscribe_topics.remove(item)
×
369
            self.options['subscribe_topics'] = subscribe_topics
4✔
370

371
        if isinstance(self.options.get('publish_topics'), str):
4!
372
            publish_topics = self.options.get('publish_topics').split(',')
×
373
            for item in publish_topics:
×
374
                if len(item) == 0:
×
375
                    publish_topics.remove(item)
×
376
            self.options['publish_topics'] = publish_topics
×
377

378
    def signal_shutdown(self, *args, **kwargs):
4✔
379
        """Shutdown the Active Fires postprocessing."""
380
        self.close()
×
381

382
    def check_incoming_message_and_get_filename(self, msg):
4✔
383
        """Check the message content and return filename if okay."""
384
        if msg.type not in ['file', 'collection', 'dataset']:
4!
385
            logger.debug("Message type not supported: %s", str(msg.type))
×
386
            return None
×
387

388
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
389
        if not os.path.exists(filename):
4✔
390
            logger.warning("File does not exist: %s", filename)
4✔
391
            return None
4✔
392

393
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
394
        if not file_ok:
4✔
395
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
396
            for output_msg in output_messages:
4✔
397
                logger.debug("Sending message: %s", str(output_msg))
4✔
398
                self.publisher.send(str(output_msg))
4✔
399
            return None
4✔
400

401
        return filename
4✔
402

403
    def get_output_filepath_from_projname(self, projname, metadata):
4✔
404
        """From the projection-name (given in the config file) retrieve the output file path."""
405
        try:
4✔
406
            pout = Parser(self.outfile_patterns_national[projname]['geojson_file_pattern'])
4✔
407
        except KeyError:
4✔
408
            raise KeyError('Projection name %s not supported in configuration!' % projname)
4✔
409

410
        return os.path.join(self.output_dir, pout.compose(metadata))
4✔
411

412
    def _national_save_and_publish(self, feature_collection, ndata, af_shapeff, msg, projname='default'):
4✔
413
        """Take the fearure collection and store the results in a Geojson file and publish."""
NEW
414
        if feature_collection is None:
×
NEW
415
            logger.info("No geojson file created, number of fires after filtering = %d", ndata)
×
NEW
416
            output_messages = self._generate_no_fires_messages(msg,
×
417
                                                               'No true fire detections inside National borders')  # noqa
NEW
418
            return
×
419

NEW
420
        out_filepath = self.get_output_filepath_from_projname(projname, af_shapeff.metadata)
×
NEW
421
        logger.debug("Output file path = %s", out_filepath)
×
422

NEW
423
        store_geojson(out_filepath, feature_collection)
×
NEW
424
        output_messages = self.get_output_messages(out_filepath, msg, ndata, proj_name=projname)
×
425

NEW
426
        for output_msg in output_messages:
×
NEW
427
            if output_msg:
×
NEW
428
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
429
                self.publisher.send(str(output_msg))
×
430

431
    def do_postprocessing_on_message(self, msg, filename):
4✔
432
        """Do the fires post processing on a message."""
433
        platform_name = msg.data.get('platform_name')
×
434
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
435
                                                   timezone=self.timezone)
436
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
437
        if len(afdata) == 0:
×
438
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
439
            for output_msg in output_messages:
×
440
                logger.debug("Sending message: %s", str(output_msg))
×
441
                self.publisher.send(str(output_msg))
×
442
            return
×
443

444
        afdata = self.fires_filtering(msg, af_shapeff)
×
445
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
446
        if len(afdata) == 0:
×
447
            logger.debug("No fires - so no regional filtering to be done!")
×
448
            return
×
449

450
        afdata = self.add_unique_day_id(afdata)
×
451
        self.save_id_to_file()
×
UNCOV
452
        afdata = self.add_tb_celcius(afdata)
×
453

454
        # 1) Create geojson feature collection
455
        # 2) Dump geojson data to disk
456

NEW
457
        feature_collection = geojson_feature_collection_from_detections(afdata,
×
458
                                                                        platform_name=af_shapeff.platform_name)
459

NEW
460
        for proj_name in self.outfile_patterns_national:
×
NEW
461
            if proj_name == "default":
×
NEW
462
                self._national_save_and_publish(feature_collection, len(afdata), af_shapeff, msg)
×
463
            else:
NEW
464
                epsg_str = self.outfile_patterns_national[proj_name].get('projection')
×
NEW
465
                other_fc = map_coordinates_in_feature_collection(feature_collection, epsg_str)
×
NEW
466
                self._national_save_and_publish(other_fc, len(afdata), af_shapeff, msg, proj_name)
×
467

468
        # Do the regional filtering now:
469
        if not self.regional_filtermask:
×
470
            logger.info("No regional filtering is attempted.")
×
471
            return
×
472

473
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
474
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
475
                                                   timezone=self.timezone)
476
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
477
                                                             globstr=self.regional_shapefiles_globstr)
478
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
479
        for region_msg in regional_messages:
×
480
            logger.debug("Sending message: %s", str(region_msg))
×
481
            self.publisher.send(str(region_msg))
×
482

483
    def run(self):
4✔
484
        """Run the AF post processing."""
485
        while self.loop:
×
486
            try:
×
487
                msg = self.listener.output_queue.get(timeout=1)
×
488
                logger.debug("Message: %s", str(msg.data))
×
489
            except Empty:
×
490
                continue
×
491
            else:
492
                filename = self.check_incoming_message_and_get_filename(msg)
×
493
                if not filename:
×
494
                    continue
×
495

496
                self.do_postprocessing_on_message(msg, filename)
×
497

498
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
499
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
500
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
501

502
        afdata = afsff_obj.get_af_data()
4✔
503
        fmda = afsff_obj.metadata
4✔
504

505
        fmda['platform'] = afsff_obj.platform_name
4✔
506

507
        pout = Parser(self.outfile_patterns_regional['default']['geojson_file_pattern'])
4✔
508

509
        output_messages = []
4✔
510
        regions_with_detections = 0
4✔
511
        for region_name in regional_fmask:
4✔
512
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
513
                continue
4✔
514

515
            regions_with_detections = regions_with_detections + 1
4✔
516
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
517

518
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
519
            logger.debug("Output file path = %s", out_filepath)
4✔
520
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
521

522
            try:
4✔
523
                feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
524
                                                                                platform_name=fmda['platform'])
525
            except ValueError:
×
526
                logger.warning("Something wrong happended storing regional " +
×
527
                               "data to Geojson - area: {name}".format(name=str(region_name)))
528
                continue
×
529

530
            store_geojson(out_filepath, feature_collection)
4✔
531

532
            outmsg = self._generate_output_message(out_filepath, msg, 'default',
4✔
533
                                                   regional_fmask[region_name])
534
            output_messages.append(outmsg)
4✔
535
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
536

537
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
538
                     "detections on this granule: %s" % str(regions_with_detections))
539
        return output_messages
4✔
540

541
    def fires_filtering(self, msg, af_shapeff):
4✔
542
        """Read Active Fire data and perform spatial filtering removing false detections.
543

544
        Do the national filtering first, and then filter out potential false
545
        detections by the special mask for that.
546

547
        """
548
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
549

550
        fmda = af_shapeff.metadata
4✔
551
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
552

553
        pout = Parser(self.outfile_patterns_national['default']['geojson_file_pattern'])
4✔
554
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
555
        logger.debug("Output file path = %s", out_filepath)
4✔
556

557
        # National filtering:
558
        af_shapeff.fires_filtering(self.shp_borders)
4✔
559

560
        # Metadata should be transfered here!
561
        afdata_ff = af_shapeff.get_af_data()
4✔
562

563
        if len(afdata_ff) > 0:
4!
564
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
565
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
566
            afdata_ff = af_shapeff.get_af_data()
4✔
567
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
568

569
        return afdata_ff
4✔
570

571
    def get_output_messages(self, filepath, msg, number_of_data, proj_name='default'):
4✔
572
        """Generate the adequate output message(s) depending on if an output file was created or not."""
573
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
4✔
574
        return [self._generate_output_message(filepath, msg, proj_name)]
4✔
575

576
    def _generate_output_message(self, filepath, input_msg, proj_name, region=None):
4✔
577
        """Create the output message to publish."""
578
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
579
        to_send = prepare_posttroll_message(input_msg, region)
4✔
580
        to_send['uri'] = str(filepath)
4✔
581
        to_send['uid'] = os.path.basename(filepath)
4✔
582
        to_send['type'] = 'GEOJSON-filtered'
4✔
583
        to_send['format'] = 'geojson'
4✔
584
        if proj_name == 'default':
4✔
585
            to_send['product'] = 'afimg'
4✔
586
        else:
587
            to_send['product'] = 'afimg_{proj_name}'.format(proj_name=proj_name)
4✔
588

589
        pubmsg = Message(output_topic, 'file', to_send)
4✔
590
        return pubmsg
4✔
591

592
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
593
        """Create the output messages to publish."""
594
        to_send = prepare_posttroll_message(input_msg)
4✔
595
        to_send['info'] = msg_string
4✔
596
        publish_messages = []
4✔
597
        for ext in ['National', 'Regional']:
4✔
598
            topic = self.output_topic + '/' + ext
4✔
599
            publish_messages.append(Message(topic, 'info', to_send))
4✔
600

601
        return publish_messages
4✔
602

603
    def _check_borders_shapes_exists(self):
4✔
604
        """Check that the national borders shapefile exists on disk."""
605
        if not os.path.exists(self.shp_borders):
4✔
606
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
607

608
    def _initialize_fire_detection_id(self):
4✔
609
        """Initialize the fire detection ID."""
610
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
611
            self._fire_detection_id = self.get_id_from_file()
4✔
612
        else:
613
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
614

615
    def update_fire_detection_id(self):
4✔
616
        """Update the fire detection ID registry."""
617
        now = datetime.utcnow()
4✔
618
        tdelta = now - self._fire_detection_id['date']
4✔
619
        if tdelta.total_seconds() > 24*3600:
4✔
620
            self._initialize_fire_detection_id()
4✔
621
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
622
            self._initialize_fire_detection_id()
4✔
623

624
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
625

626
    def save_id_to_file(self):
4✔
627
        """Save the (current) detection id on disk.
628

629
        It is assumed that the user permissions are so that a file can actually
630
        be written to disk here!
631
        """
632
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
633
            id_ = self._create_id_string()
4✔
634
            fpt.write(id_)
4✔
635

636
    def get_id_from_file(self):
4✔
637
        """Read the latest stored detection id string from disk and convert to internal format."""
638
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
639
            idstr = fpt.read()
4✔
640

641
        return self._get_id_from_string(idstr)
4✔
642

643
    def _get_id_from_string(self, idstr):
4✔
644
        """Get the detection id from string."""
645
        datestr, counter = idstr.split('-')
4✔
646
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
647
                'counter': int(counter)}
648

649
    def _create_id_string(self):
4✔
650
        """From the internal fire detection id create the id string to be exposed to the user."""
651
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
652
                '-' + str(self._fire_detection_id['counter']))
653

654
    def add_unique_day_id(self, data_frame):
4✔
655
        """Add a unique detection id - date + a running number for the day."""
656
        # Add id's to the detections:
657
        id_list = []
4✔
658
        for _i in range(len(data_frame)):
4✔
659
            self.update_fire_detection_id()
4✔
660
            id_ = self._create_id_string()
4✔
661
            id_list.append(id_)
4✔
662

663
        col = len(data_frame.columns)
4✔
664
        data_frame.insert(col, 'detection_id', id_list)
4✔
665
        return data_frame
4✔
666

667
    def add_tb_celcius(self, data_frame):
4✔
668
        """Add a column with TB in Celcius to the fire detection data frame."""
669
        def kelvin2celcius(x):
4✔
670
            tb_c = self.unit_converter.convert('temperature', x)
4✔
671
            return tb_c.magnitude
4✔
672

673
        col = data_frame.columns.get_loc('tb')
4✔
674
        data_frame.insert(2, "tb_celcius", data_frame.iloc[:, col].apply(kelvin2celcius))
4✔
675
        return data_frame
4✔
676

677
    def close(self):
4✔
678
        """Shutdown the Active Fires postprocessing."""
679
        logger.info('Terminating Active Fires post processing.')
×
680
        self.loop = False
×
681
        logger.info('Dumping the latest detection id to disk: %s', str(self.filepath_detection_id_cache))
×
682
        self.save_id_to_file()
×
683
        try:
×
684
            self.listener.stop()
×
685
        except Exception:
×
686
            logger.exception("Couldn't stop listener.")
×
687
        if self.publisher:
×
688
            try:
×
689
                self.publisher.stop()
×
690
            except Exception:
×
691
                logger.exception("Couldn't stop publisher.")
×
692

693

694
def check_file_type_okay(file_type):
4✔
695
    """Check if the file is of the correct type."""
696
    if file_type not in ['txt', 'TXT']:
4✔
697
        logger.info('File type not txt: %s', str(file_type))
4✔
698
        return False
4✔
699
    return True
4✔
700

701

702
def get_filename_from_uri(uri):
4✔
703
    """Get the file name from the uri given."""
704
    logger.info('File uri: %s', str(uri))
4✔
705
    url = urlparse(uri)
4✔
706
    return url.path
4✔
707

708

709
def generate_posttroll_topic(output_topic, region=None):
4✔
710
    """Create the topic for the posttroll message to publish."""
711
    if region:
4✔
712
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
713
    else:
714
        output_topic = output_topic + '/National'
4✔
715

716
    return output_topic
4✔
717

718

719
def prepare_posttroll_message(input_msg, region=None):
4✔
720
    """Create the basic posttroll-message fields and return."""
721
    to_send = input_msg.data.copy()
4✔
722
    to_send.pop('dataset', None)
4✔
723
    to_send.pop('collection', None)
4✔
724
    to_send.pop('uri', None)
4✔
725
    to_send.pop('uid', None)
4✔
726
    to_send.pop('format', None)
4✔
727
    to_send.pop('type', None)
4✔
728
    # FIXME! Check that the region_name is stored as a unicode string!
729
    if region:
4✔
730
        to_send['region_name'] = region['attributes']['Testomr']
4✔
731
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
732

733
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc