• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 6100871927

06 Sep 2023 06:16PM UTC coverage: 83.327% (+0.3%) from 83.035%
6100871927

Pull #17

github

web-flow
Merge 6f5ece530 into 06a6c995e
Pull Request #17: Refactor and add sweref99 output

286 of 441 branches covered (0.0%)

224 of 251 new or added lines in 9 files covered. (89.24%)

1 existing line in 1 file now uncovered.

2069 of 2483 relevant lines covered (83.33%)

3.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.19
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbroe
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33

34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.utils import datetime_utc2local
4✔
46
from activefires_pp.utils import UnitConverter
4✔
47
from activefires_pp.utils import get_local_timezone_offset
4✔
48
from activefires_pp.config import read_config
4✔
49
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
50

51
from activefires_pp.geojson_utils import store_geojson
4✔
52
from activefires_pp.geojson_utils import geojson_feature_collection_from_detections
4✔
53
from activefires_pp.geojson_utils import map_coordinates_in_feature_collection
4✔
54

55

56
# M-band output:
57
# column 1: latitude of fire pixel (degrees)
58
# column 2: longitude of fire pixel (degrees)
59
# column 3: M13 brightness temperature of fire pixel (K)
60
# column 4: Along-scan fire pixel resolution (km)
61
# column 5: Along-track fire pixel resolution (km)
62
# column 6: detection confidence (%)
63
#           7=low, 8=nominal, 9=high (Imager Resolution)
64
#           0-100 % (Moderate Resolution)
65
# column 7: fire radiative power (MW)
66
# I-band output:
67
# column 1: latitude of fire pixel (degrees)
68
# column 2: longitude of fire pixel (degrees)
69
# column 3: I04 brightness temperature of fire pixel (K)
70
# column 4: Along-scan fire pixel resolution (km)
71
# column 5: Along-track fire pixel resolution (km)
72
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
73
# column 7: fire radiative power (MW)
74
#
75
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
76

77
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
78

79

80
logger = logging.getLogger(__name__)
4✔
81
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
82

83

84
class ActiveFiresShapefileFiltering(object):
4✔
85
    """Reading, filtering and writing Active Fire detections.
86

87
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
88
    Filtering for static and false alarms, and/or simply on geographical regions.
89
    Data is stored in geojson format.
90
    """
91

92
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
93
        """Initialize the ActiveFiresShapefileFiltering class."""
94
        self.input_filepath = filepath
4✔
95
        self._afdata = afdata
4✔
96
        if afdata is None:
4✔
97
            self.metadata = {}
4✔
98
        else:
99
            self.metadata = afdata.attrs
4✔
100

101
        self.timezone = timezone
4✔
102
        self.platform_name = platform_name
4✔
103

104
    def get_af_data(self, filepattern=None, localtime=True):
4✔
105
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
106
        if self._afdata is not None:
4✔
107
            # Make sure the attrs are populated with metadata instance attribute
108
            self._afdata.attrs.update(self.metadata)
4✔
109
            return self._afdata
4✔
110

111
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
112
            # FIXME! Better to raise an exception!?
113
            return self._afdata
×
114

115
        if not filepattern:
4!
116
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
117

118
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
119
        self._afdata = _read_data(self.input_filepath)
4✔
120
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
121

122
        return self._afdata
4✔
123

124
    def _get_metadata_from_filename(self, infile_pattern):
4✔
125
        """From the filename retrieve the metadata such as satellite and sensing time."""
126
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
127

128
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
129
        """Add start and end time to active fires data."""
130
        if localtime:
4✔
131
            logger.info("Convert to local time zone!")
4✔
132
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
133
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
134
        else:
135
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
136
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
137

138
        starttime = starttime.replace(tzinfo=None)
4✔
139
        endtime = endtime.replace(tzinfo=None)
4✔
140

141
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
142
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
143

144
        logger.info('Start and end times: %s %s',
4✔
145
                    str(self._afdata['starttime'][0]),
146
                    str(self._afdata['endtime'][0]))
147

148
    def _apply_timezone_offset(self, obstime):
4✔
149
        """Apply the time zone offset to the datetime objects."""
150
        obstime_offset = get_local_timezone_offset(self.timezone)
×
151
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
152
                         len(self._afdata)).astype(np.datetime64)
153

154
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
155
        """Remove fires outside National borders or filter out potential false detections.
156

157
        If *inside* is True the filtering will keep those detections that are inside the polygon.
158
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
159
        """
160
        detections = self._afdata
4✔
161

162
        lons = detections.longitude.values
4✔
163
        lats = detections.latitude.values
4✔
164

165
        toc = time.time()
4✔
166
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
167
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
168

169
        self._afdata = detections[points_inside == inside]
4✔
170

171
        if len(self._afdata) == 0:
4!
172
            logger.debug("No fires after filtering on Polygon...")
×
173
        else:
174
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
175

176
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
177
        """Get the regional filter masks from the shapefile."""
178
        detections = self._afdata
×
179

180
        lons = detections.longitude.values
×
181
        lats = detections.latitude.values
×
182

183
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
184
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
185
        shape_geom = ShapeGeometry(shapefile, globstr)
×
186
        shape_geom.load()
×
187

188
        p__ = pyproj.Proj(shape_geom.proj4str)
×
189
        metersx, metersy = p__(lons, lats)
×
190
        points = np.vstack([metersx, metersy]).T
×
191

192
        regional_masks = {}
×
193

194
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
195
            test_omr = attr['Testomr']
×
196
            all_inside_test_omr = False
×
197
            some_inside_test_omr = False
×
198
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
199

200
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
201

202
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
203
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
204
            else:
205
                shape = geometry
×
206
                pth = Path(shape.exterior.coords)
×
207
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
208

209
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
210
                all_inside_test_omr = True
×
211
                some_inside_test_omr = True
×
212
                logger.debug("All points inside test area!")
×
213
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
214
                some_inside_test_omr = True
×
215
                logger.debug("Some points inside test area!")
×
216

217
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
218
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
219

220
        return regional_masks
×
221

222

223
def _read_data(filepath):
4✔
224
    """Read the AF data."""
225
    with open(filepath, 'r') as fpt:
×
226
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
227

228

229
def get_metadata_from_filename(infile_pattern, filepath):
4✔
230
    """From the filename and its pattern get basic metadata of the satellite observations."""
231
    p__ = Parser(infile_pattern)
4✔
232
    fname = os.path.basename(filepath)
4✔
233
    try:
4✔
234
        res = p__.parse(fname)
4✔
235
    except ValueError:
×
236
        # Do something!
237
        return None
×
238

239
    # Fix the end time:
240
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
241
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
242
                       res['end_hour'].second)
243
    if endtime < res['start_time']:
4!
244
        endtime = endtime + timedelta(days=1)
×
245

246
    res['end_time'] = endtime
4✔
247

248
    return res
4✔
249

250

251
def store(output_filename, detections):
4✔
252
    """Store the filtered AF detections on disk."""
253
    if len(detections) > 0:
×
254
        detections.to_csv(output_filename, index=False)
×
255
        return output_filename
×
256
    else:
257
        logger.debug("No detections to save!")
×
258
        return None
×
259

260

261
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
262
    """Get mask for points from a shapely Multipolygon."""
263
    shape = geometry.geoms[0]
4✔
264
    pth = Path(shape.exterior.coords)
4✔
265
    mask = pth.contains_points(points)
4✔
266

267
    if sum(mask) == len(points):
4!
268
        return mask
×
269

270
    constituent_part = geometry.geoms[start_idx:]
4✔
271
    for shape in constituent_part.geoms:
4✔
272
        pth = Path(shape.exterior.coords)
4✔
273
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
274
        if sum(mask) == len(points):
4✔
275
            break
4✔
276

277
    return mask
4✔
278

279

280
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
281
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
282
    lons, lats = lonlats
4✔
283
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
284
    shape_geom = ShapeGeometry(shapefile)
4✔
285
    shape_geom.load()
4✔
286

287
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
288

289
    # There is only one geometry/multi-polygon!
290
    geometry = shape_geom.geometries[0]
4✔
291

292
    metersx, metersy = p__(lons, lats)
4✔
293
    points = np.vstack([metersx, metersy]).T
4✔
294

295
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
296

297

298
class ActiveFiresPostprocessing(Thread):
4✔
299
    """The active fires post processor."""
300

301
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
302
        """Initialize the active fires post processor class."""
303
        super().__init__()
4✔
304
        self.shp_borders = shp_borders
4✔
305
        self.shp_filtermask = shp_mask
4✔
306

307
        self.regional_filtermask = regional_filtermask
4✔
308
        self.configfile = configfile
4✔
309
        self.options = {}
4✔
310

311
        config = read_config(self.configfile)
4✔
312
        self._set_options_from_config(config)
4✔
313

314
        self.host = socket.gethostname()
4✔
315
        self.timezone = self.options.get('timezone', 'GMT')
4✔
316

317
        self.input_topic = self.options['subscribe_topics'][0]
4✔
318
        self.output_topic = self.options['publish_topic']
4✔
319
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
320

321
        self.outfile_patterns_national = config.get('output').get('national')
4✔
322
        self.outfile_patterns_regional = config.get('output').get('regional')
4✔
323

324
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
325

326
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
327

328
        frmt = self.options['regional_shapefiles_format']
4✔
329
        self.regional_shapefiles_globstr = globify(frmt)
4✔
330

331
        self._fire_detection_id = None
4✔
332
        self._initialize_fire_detection_id()
4✔
333

334
        self.listener = None
4✔
335
        self.publisher = None
4✔
336
        self.loop = False
4✔
337
        self._setup_and_start_communication()
4✔
338

339
    def _setup_and_start_communication(self):
4✔
340
        """Set up the Posttroll communication and start the publisher."""
341
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
342
        now = datetime_utc2local(datetime.now(), self.timezone)
×
343
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
344

345
        tic = time.time()
×
346
        units = {'temperature': 'degC'}
×
347
        self.unit_converter = UnitConverter(units)
×
348
        logger.debug("Unit conversion initialization with Pint took %f seconds.", time.time() - tic)
×
349

350
        self._check_borders_shapes_exists()
×
351

352
        self.listener = ListenerContainer(topics=[self.input_topic])
×
353
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
354
        self.publisher.start()
×
355
        self.loop = True
×
356
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
357

358
    def _set_options_from_config(self, config):
4✔
359
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
360
        for item in config:
4✔
361
            if not isinstance(config[item], dict):
4✔
362
                self.options[item] = config[item]
4✔
363

364
        if isinstance(self.options.get('subscribe_topics'), str):
4!
365
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
366
            for item in subscribe_topics:
4✔
367
                if len(item) == 0:
4!
368
                    subscribe_topics.remove(item)
×
369
            self.options['subscribe_topics'] = subscribe_topics
4✔
370

371
        if isinstance(self.options.get('publish_topics'), str):
4!
372
            publish_topics = self.options.get('publish_topics').split(',')
×
373
            for item in publish_topics:
×
374
                if len(item) == 0:
×
375
                    publish_topics.remove(item)
×
376
            self.options['publish_topics'] = publish_topics
×
377

378
    def signal_shutdown(self, *args, **kwargs):
4✔
379
        """Shutdown the Active Fires postprocessing."""
380
        self.close()
×
381

382
    def check_incoming_message_and_get_filename(self, msg):
4✔
383
        """Check the message content and return filename if okay."""
384
        if msg.type not in ['file', 'collection', 'dataset']:
4!
385
            logger.debug("Message type not supported: %s", str(msg.type))
×
386
            return None
×
387

388
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
389
        if not os.path.exists(filename):
4✔
390
            logger.warning("File does not exist: %s", filename)
4✔
391
            return None
4✔
392

393
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
394
        if not file_ok:
4✔
395
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
396
            for output_msg in output_messages:
4✔
397
                logger.debug("Sending message: %s", str(output_msg))
4✔
398
                self.publisher.send(str(output_msg))
4✔
399
            return None
4✔
400

401
        return filename
4✔
402

403
    def _national_save_and_publish(self, feature_collection, ndata, af_shapeff, msg, projname='default'):
4✔
404
        """Take the fearure collection and store the results in a Geojson file and publish."""
NEW
405
        if feature_collection is None:
×
NEW
406
            logger.info("No geojson file created, number of fires after filtering = %d", ndata)
×
NEW
407
            output_messages = self._generate_no_fires_messages(msg,
×
408
                                                               'No true fire detections inside National borders')  # noqa
NEW
409
            return
×
410

NEW
411
        fmda = af_shapeff.metadata
×
NEW
412
        if projname != 'default':
×
NEW
413
            pout = Parser(self.outfile_patterns_national[projname]['geojson_file_pattern'])
×
414
        else:
NEW
415
            pout = Parser(self.outfile_patterns_national['default']['geojson_file_pattern'])
×
416

NEW
417
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
×
NEW
418
        logger.debug("Output file path = %s", out_filepath)
×
419

NEW
420
        store_geojson(out_filepath, feature_collection)
×
NEW
421
        output_messages = self.get_output_messages(out_filepath, msg, ndata, proj_name=projname)
×
422

NEW
423
        for output_msg in output_messages:
×
NEW
424
            if output_msg:
×
NEW
425
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
426
                self.publisher.send(str(output_msg))
×
427

428
    def do_postprocessing_on_message(self, msg, filename):
4✔
429
        """Do the fires post processing on a message."""
430
        platform_name = msg.data.get('platform_name')
×
431
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
432
                                                   timezone=self.timezone)
433
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
434
        if len(afdata) == 0:
×
435
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
436
            for output_msg in output_messages:
×
437
                logger.debug("Sending message: %s", str(output_msg))
×
438
                self.publisher.send(str(output_msg))
×
439
            return
×
440

441
        afdata = self.fires_filtering(msg, af_shapeff)
×
442
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
443
        if len(afdata) == 0:
×
444
            logger.debug("No fires - so no regional filtering to be done!")
×
445
            return
×
446

447
        afdata = self.add_unique_day_id(afdata)
×
448
        self.save_id_to_file()
×
UNCOV
449
        afdata = self.add_tb_celcius(afdata)
×
450

451
        # 1) Create geojson feature collection
452
        # 2) Dump geojson data to disk
453

NEW
454
        feature_collection = geojson_feature_collection_from_detections(afdata,
×
455
                                                                        platform_name=af_shapeff.platform_name)
456

NEW
457
        for proj_name in self.outfile_patterns_national:
×
NEW
458
            if proj_name == "default":
×
NEW
459
                self._national_save_and_publish(feature_collection, len(afdata), af_shapeff, msg)
×
460
            else:
NEW
461
                epsg_str = self.outfile_patterns_national[proj_name].get('projection')
×
NEW
462
                other_fc = map_coordinates_in_feature_collection(feature_collection, epsg_str)
×
NEW
463
                self._national_save_and_publish(other_fc, len(afdata), af_shapeff, msg, proj_name)
×
464

465
        # Do the regional filtering now:
466
        if not self.regional_filtermask:
×
467
            logger.info("No regional filtering is attempted.")
×
468
            return
×
469

470
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
471
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
472
                                                   timezone=self.timezone)
473
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
474
                                                             globstr=self.regional_shapefiles_globstr)
475
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
476
        for region_msg in regional_messages:
×
477
            logger.debug("Sending message: %s", str(region_msg))
×
478
            self.publisher.send(str(region_msg))
×
479

480
    def run(self):
4✔
481
        """Run the AF post processing."""
482
        while self.loop:
×
483
            try:
×
484
                msg = self.listener.output_queue.get(timeout=1)
×
485
                logger.debug("Message: %s", str(msg.data))
×
486
            except Empty:
×
487
                continue
×
488
            else:
489
                filename = self.check_incoming_message_and_get_filename(msg)
×
490
                if not filename:
×
491
                    continue
×
492

493
                self.do_postprocessing_on_message(msg, filename)
×
494

495
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
496
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
497
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
498

499
        afdata = afsff_obj.get_af_data()
4✔
500
        fmda = afsff_obj.metadata
4✔
501

502
        fmda['platform'] = afsff_obj.platform_name
4✔
503

504
        pout = Parser(self.outfile_patterns_regional['default']['geojson_file_pattern'])
4✔
505

506
        output_messages = []
4✔
507
        regions_with_detections = 0
4✔
508
        for region_name in regional_fmask:
4✔
509
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
510
                continue
4✔
511

512
            regions_with_detections = regions_with_detections + 1
4✔
513
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
514

515
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
516
            logger.debug("Output file path = %s", out_filepath)
4✔
517
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
518

519
            try:
4✔
520
                feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
521
                                                                                platform_name=fmda['platform'])
522
            except ValueError:
×
523
                logger.warning("Something wrong happended storing regional " +
×
524
                               "data to Geojson - area: {name}".format(name=str(region_name)))
525
                continue
×
526

527
            store_geojson(out_filepath, feature_collection)
4✔
528

529
            outmsg = self._generate_output_message(out_filepath, msg, 'default',
4✔
530
                                                   regional_fmask[region_name])
531
            output_messages.append(outmsg)
4✔
532
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
533

534
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
535
                     "detections on this granule: %s" % str(regions_with_detections))
536
        return output_messages
4✔
537

538
    def fires_filtering(self, msg, af_shapeff):
4✔
539
        """Read Active Fire data and perform spatial filtering removing false detections.
540

541
        Do the national filtering first, and then filter out potential false
542
        detections by the special mask for that.
543

544
        """
545
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
546

547
        fmda = af_shapeff.metadata
4✔
548
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
549

550
        pout = Parser(self.outfile_patterns_national['default']['geojson_file_pattern'])
4✔
551
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
552
        logger.debug("Output file path = %s", out_filepath)
4✔
553

554
        # National filtering:
555
        af_shapeff.fires_filtering(self.shp_borders)
4✔
556

557
        # Metadata should be transfered here!
558
        afdata_ff = af_shapeff.get_af_data()
4✔
559

560
        if len(afdata_ff) > 0:
4!
561
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
562
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
563
            afdata_ff = af_shapeff.get_af_data()
4✔
564
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
565

566
        return afdata_ff
4✔
567

568
    def get_output_messages(self, filepath, msg, number_of_data, proj_name='default'):
4✔
569
        """Generate the adequate output message(s) depending on if an output file was created or not."""
570
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
4✔
571
        return [self._generate_output_message(filepath, msg, proj_name)]
4✔
572

573
    def _generate_output_message(self, filepath, input_msg, proj_name, region=None):
4✔
574
        """Create the output message to publish."""
575
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
576
        to_send = prepare_posttroll_message(input_msg, region)
4✔
577
        to_send['uri'] = str(filepath)
4✔
578
        to_send['uid'] = os.path.basename(filepath)
4✔
579
        to_send['type'] = 'GEOJSON-filtered'
4✔
580
        to_send['format'] = 'geojson'
4✔
581
        if proj_name == 'default':
4✔
582
            to_send['product'] = 'afimg'
4✔
583
        else:
584
            to_send['product'] = 'afimg_{proj_name}'.format(proj_name=proj_name)
4✔
585

586
        pubmsg = Message(output_topic, 'file', to_send)
4✔
587
        return pubmsg
4✔
588

589
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
590
        """Create the output messages to publish."""
591
        to_send = prepare_posttroll_message(input_msg)
4✔
592
        to_send['info'] = msg_string
4✔
593
        publish_messages = []
4✔
594
        for ext in ['National', 'Regional']:
4✔
595
            topic = self.output_topic + '/' + ext
4✔
596
            publish_messages.append(Message(topic, 'info', to_send))
4✔
597

598
        return publish_messages
4✔
599

600
    def _check_borders_shapes_exists(self):
4✔
601
        """Check that the national borders shapefile exists on disk."""
602
        if not os.path.exists(self.shp_borders):
4✔
603
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
604

605
    def _initialize_fire_detection_id(self):
4✔
606
        """Initialize the fire detection ID."""
607
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
608
            self._fire_detection_id = self.get_id_from_file()
4✔
609
        else:
610
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
611

612
    def update_fire_detection_id(self):
4✔
613
        """Update the fire detection ID registry."""
614
        now = datetime.utcnow()
4✔
615
        tdelta = now - self._fire_detection_id['date']
4✔
616
        if tdelta.total_seconds() > 24*3600:
4✔
617
            self._initialize_fire_detection_id()
4✔
618
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
619
            self._initialize_fire_detection_id()
4✔
620

621
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
622

623
    def save_id_to_file(self):
4✔
624
        """Save the (current) detection id on disk.
625

626
        It is assumed that the user permissions are so that a file can actually
627
        be written to disk here!
628
        """
629
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
630
            id_ = self._create_id_string()
4✔
631
            fpt.write(id_)
4✔
632

633
    def get_id_from_file(self):
4✔
634
        """Read the latest stored detection id string from disk and convert to internal format."""
635
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
636
            idstr = fpt.read()
4✔
637

638
        return self._get_id_from_string(idstr)
4✔
639

640
    def _get_id_from_string(self, idstr):
4✔
641
        """Get the detection id from string."""
642
        datestr, counter = idstr.split('-')
4✔
643
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
644
                'counter': int(counter)}
645

646
    def _create_id_string(self):
4✔
647
        """From the internal fire detection id create the id string to be exposed to the user."""
648
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
649
                '-' + str(self._fire_detection_id['counter']))
650

651
    def add_unique_day_id(self, data_frame):
4✔
652
        """Add a unique detection id - date + a running number for the day."""
653
        # Add id's to the detections:
654
        id_list = []
4✔
655
        for _i in range(len(data_frame)):
4✔
656
            self.update_fire_detection_id()
4✔
657
            id_ = self._create_id_string()
4✔
658
            id_list.append(id_)
4✔
659

660
        col = len(data_frame.columns)
4✔
661
        data_frame.insert(col, 'detection_id', id_list)
4✔
662
        return data_frame
4✔
663

664
    def add_tb_celcius(self, data_frame):
4✔
665
        """Add a column with TB in Celcius to the fire detection data frame."""
666
        def kelvin2celcius(x):
4✔
667
            tb_c = self.unit_converter.convert('temperature', x)
4✔
668
            return tb_c.magnitude
4✔
669

670
        col = data_frame.columns.get_loc('tb')
4✔
671
        data_frame.insert(2, "tb_celcius", data_frame.iloc[:, col].apply(kelvin2celcius))
4✔
672
        return data_frame
4✔
673

674
    def close(self):
4✔
675
        """Shutdown the Active Fires postprocessing."""
676
        logger.info('Terminating Active Fires post processing.')
×
677
        self.loop = False
×
678
        logger.info('Dumping the latest detection id to disk: %s', str(self.filepath_detection_id_cache))
×
679
        self.save_id_to_file()
×
680
        try:
×
681
            self.listener.stop()
×
682
        except Exception:
×
683
            logger.exception("Couldn't stop listener.")
×
684
        if self.publisher:
×
685
            try:
×
686
                self.publisher.stop()
×
687
            except Exception:
×
688
                logger.exception("Couldn't stop publisher.")
×
689

690

691
def check_file_type_okay(file_type):
4✔
692
    """Check if the file is of the correct type."""
693
    if file_type not in ['txt', 'TXT']:
4✔
694
        logger.info('File type not txt: %s', str(file_type))
4✔
695
        return False
4✔
696
    return True
4✔
697

698

699
def get_filename_from_uri(uri):
4✔
700
    """Get the file name from the uri given."""
701
    logger.info('File uri: %s', str(uri))
4✔
702
    url = urlparse(uri)
4✔
703
    return url.path
4✔
704

705

706
def generate_posttroll_topic(output_topic, region=None):
4✔
707
    """Create the topic for the posttroll message to publish."""
708
    if region:
4✔
709
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
710
    else:
711
        output_topic = output_topic + '/National'
4✔
712

713
    return output_topic
4✔
714

715

716
def prepare_posttroll_message(input_msg, region=None):
4✔
717
    """Create the basic posttroll-message fields and return."""
718
    to_send = input_msg.data.copy()
4✔
719
    to_send.pop('dataset', None)
4✔
720
    to_send.pop('collection', None)
4✔
721
    to_send.pop('uri', None)
4✔
722
    to_send.pop('uid', None)
4✔
723
    to_send.pop('format', None)
4✔
724
    to_send.pop('type', None)
4✔
725
    # FIXME! Check that the region_name is stored as a unicode string!
726
    if region:
4✔
727
        to_send['region_name'] = region['attributes']['Testomr']
4✔
728
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
729

730
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc