• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 5919507582

20 Aug 2023 08:39PM UTC coverage: 82.64% (+0.3%) from 82.34%
5919507582

Pull #17

github

Adam.Dybbroe
Merge branch 'sweref99_projection_output' of github.com:adybbroe/activefires-pp into sweref99_projection_output

# Conflicts:
#	activefires_pp/post_processing.py

Signed-off-by: Adam.Dybbroe <a000680@c21856.ad.smhi.se>
Pull Request #17: Refactor and add sweref99 output

283 of 439 branches covered (64.46%)

206 of 233 new or added lines in 9 files covered. (88.41%)

1 existing line in 1 file now uncovered.

1947 of 2356 relevant lines covered (82.64%)

3.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.27
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbro
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33

34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.geojson_utils import store_geojson
4✔
46
from activefires_pp.geojson_utils import geojson_feature_collection_from_detections
4✔
47
from activefires_pp.geojson_utils import map_coordinates_in_feature_collection
4✔
48

49
from activefires_pp.utils import datetime_utc2local
4✔
50
from activefires_pp.utils import get_local_timezone_offset
4✔
51
from activefires_pp.config import read_config
4✔
52
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
53

54
# M-band output:
55
# column 1: latitude of fire pixel (degrees)
56
# column 2: longitude of fire pixel (degrees)
57
# column 3: M13 brightness temperature of fire pixel (K)
58
# column 4: Along-scan fire pixel resolution (km)
59
# column 5: Along-track fire pixel resolution (km)
60
# column 6: detection confidence (%)
61
#           7=low, 8=nominal, 9=high (Imager Resolution)
62
#           0-100 % (Moderate Resolution)
63
# column 7: fire radiative power (MW)
64
# I-band output:
65
# column 1: latitude of fire pixel (degrees)
66
# column 2: longitude of fire pixel (degrees)
67
# column 3: I04 brightness temperature of fire pixel (K)
68
# column 4: Along-scan fire pixel resolution (km)
69
# column 5: Along-track fire pixel resolution (km)
70
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
71
# column 7: fire radiative power (MW)
72
#
73
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
74

75
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
76

77

78
logger = logging.getLogger(__name__)
4✔
79
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
80

81

82
class ActiveFiresShapefileFiltering(object):
4✔
83
    """Reading, filtering and writing Active Fire detections.
84

85
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
86
    Filtering for static and false alarms, and/or simply on geographical regions.
87
    Data is stored in geojson format.
88
    """
89

90
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
91
        """Initialize the ActiveFiresShapefileFiltering class."""
92
        self.input_filepath = filepath
4✔
93
        self._afdata = afdata
4✔
94
        if afdata is None:
4✔
95
            self.metadata = {}
4✔
96
        else:
97
            self.metadata = afdata.attrs
4✔
98

99
        self.timezone = timezone
4✔
100
        self.platform_name = platform_name
4✔
101

102
    def get_af_data(self, filepattern=None, localtime=True):
4✔
103
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
104
        if self._afdata is not None:
4✔
105
            # Make sure the attrs are populated with metadata instance attribute
106
            self._afdata.attrs.update(self.metadata)
4✔
107
            return self._afdata
4✔
108

109
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
110
            # FIXME! Better to raise an exception!?
111
            return self._afdata
×
112

113
        if not filepattern:
4!
114
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
115

116
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
117
        self._afdata = _read_data(self.input_filepath)
4✔
118
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
119

120
        return self._afdata
4✔
121

122
    def _get_metadata_from_filename(self, infile_pattern):
4✔
123
        """From the filename retrieve the metadata such as satellite and sensing time."""
124
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
125

126
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
127
        """Add start and end time to active fires data."""
128
        if localtime:
4✔
129
            logger.info("Convert to local time zone!")
4✔
130
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
131
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
132
        else:
133
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
134
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
135

136
        starttime = starttime.replace(tzinfo=None)
4✔
137
        endtime = endtime.replace(tzinfo=None)
4✔
138

139
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
140
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
141

142
        logger.info('Start and end times: %s %s',
4✔
143
                    str(self._afdata['starttime'][0]),
144
                    str(self._afdata['endtime'][0]))
145

146
    def _apply_timezone_offset(self, obstime):
4✔
147
        """Apply the time zone offset to the datetime objects."""
148
        obstime_offset = get_local_timezone_offset(self.timezone)
×
149
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
150
                         len(self._afdata)).astype(np.datetime64)
151

152
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
153
        """Remove fires outside National borders or filter out potential false detections.
154

155
        If *inside* is True the filtering will keep those detections that are inside the polygon.
156
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
157
        """
158
        detections = self._afdata
4✔
159

160
        lons = detections.longitude.values
4✔
161
        lats = detections.latitude.values
4✔
162

163
        toc = time.time()
4✔
164
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
165
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
166

167
        self._afdata = detections[points_inside == inside]
4✔
168

169
        if len(self._afdata) == 0:
4!
170
            logger.debug("No fires after filtering on Polygon...")
×
171
        else:
172
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
173

174
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
175
        """Get the regional filter masks from the shapefile."""
176
        detections = self._afdata
×
177

178
        lons = detections.longitude.values
×
179
        lats = detections.latitude.values
×
180

181
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
182
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
183
        shape_geom = ShapeGeometry(shapefile, globstr)
×
184
        shape_geom.load()
×
185

186
        p__ = pyproj.Proj(shape_geom.proj4str)
×
187
        metersx, metersy = p__(lons, lats)
×
188
        points = np.vstack([metersx, metersy]).T
×
189

190
        regional_masks = {}
×
191

192
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
193
            test_omr = attr['Testomr']
×
194
            all_inside_test_omr = False
×
195
            some_inside_test_omr = False
×
196
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
197

198
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
199

200
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
201
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
202
            else:
203
                shape = geometry
×
204
                pth = Path(shape.exterior.coords)
×
205
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
206

207
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
208
                all_inside_test_omr = True
×
209
                some_inside_test_omr = True
×
210
                logger.debug("All points inside test area!")
×
211
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
212
                some_inside_test_omr = True
×
213
                logger.debug("Some points inside test area!")
×
214

215
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
216
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
217

218
        return regional_masks
×
219

220

221
def _read_data(filepath):
4✔
222
    """Read the AF data."""
223
    with open(filepath, 'r') as fpt:
×
224
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
225

226

227
def get_metadata_from_filename(infile_pattern, filepath):
4✔
228
    """From the filename and its pattern get basic metadata of the satellite observations."""
229
    p__ = Parser(infile_pattern)
4✔
230
    fname = os.path.basename(filepath)
4✔
231
    try:
4✔
232
        res = p__.parse(fname)
4✔
233
    except ValueError:
×
234
        # Do something!
235
        return None
×
236

237
    # Fix the end time:
238
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
239
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
240
                       res['end_hour'].second)
241
    if endtime < res['start_time']:
4!
242
        endtime = endtime + timedelta(days=1)
×
243

244
    res['end_time'] = endtime
4✔
245

246
    return res
4✔
247

248

249
def store(output_filename, detections):
4✔
250
    """Store the filtered AF detections on disk."""
251
    if len(detections) > 0:
×
252
        detections.to_csv(output_filename, index=False)
×
253
        return output_filename
×
254
    else:
255
        logger.debug("No detections to save!")
×
256
        return None
×
257

258

259
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
260
    """Get mask for points from a shapely Multipolygon."""
261
    shape = geometry.geoms[0]
4✔
262
    pth = Path(shape.exterior.coords)
4✔
263
    mask = pth.contains_points(points)
4✔
264

265
    if sum(mask) == len(points):
4!
266
        return mask
×
267

268
    constituent_part = geometry.geoms[start_idx:]
4✔
269
    for shape in constituent_part.geoms:
4✔
270
        pth = Path(shape.exterior.coords)
4✔
271
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
272
        if sum(mask) == len(points):
4✔
273
            break
4✔
274

275
    return mask
4✔
276

277

278
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
279
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
280
    lons, lats = lonlats
4✔
281
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
282
    shape_geom = ShapeGeometry(shapefile)
4✔
283
    shape_geom.load()
4✔
284

285
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
286

287
    # There is only one geometry/multi-polygon!
288
    geometry = shape_geom.geometries[0]
4✔
289

290
    metersx, metersy = p__(lons, lats)
4✔
291
    points = np.vstack([metersx, metersy]).T
4✔
292

293
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
294

295

296
class ActiveFiresPostprocessing(Thread):
4✔
297
    """The active fires post processor."""
298

299
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
300
        """Initialize the active fires post processor class."""
301
        super().__init__()
4✔
302
        self.shp_borders = shp_borders
4✔
303
        self.shp_filtermask = shp_mask
4✔
304

305
        self.regional_filtermask = regional_filtermask
4✔
306
        self.configfile = configfile
4✔
307
        self.options = {}
4✔
308

309
        config = read_config(self.configfile)
4✔
310
        self._set_options_from_config(config)
4✔
311

312
        self.host = socket.gethostname()
4✔
313
        self.timezone = self.options.get('timezone', 'GMT')
4✔
314

315
        self.input_topic = self.options['subscribe_topics'][0]
4✔
316
        self.output_topic = self.options['publish_topic']
4✔
317
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
318

319
        self.outfile_patterns_national = config.get('output').get('national')
4✔
320
        self.outfile_patterns_regional = config.get('output').get('regional')
4✔
321

322
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
323

324
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
325

326
        frmt = self.options['regional_shapefiles_format']
4✔
327
        self.regional_shapefiles_globstr = globify(frmt)
4✔
328

329
        self._fire_detection_id = None
4✔
330
        self._initialize_fire_detection_id()
4✔
331

332
        self.listener = None
4✔
333
        self.publisher = None
4✔
334
        self.loop = False
4✔
335
        self._setup_and_start_communication()
4✔
336

337
    def _setup_and_start_communication(self):
4✔
338
        """Set up the Posttroll communication and start the publisher."""
339
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
340
        now = datetime_utc2local(datetime.now(), self.timezone)
×
341
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
342

343
        self._check_borders_shapes_exists()
×
344

345
        self.listener = ListenerContainer(topics=[self.input_topic])
×
346
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
347
        self.publisher.start()
×
348
        self.loop = True
×
349
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
350

351
    def _set_options_from_config(self, config):
4✔
352
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
353
        for item in config:
4✔
354
            if not isinstance(config[item], dict):
4✔
355
                self.options[item] = config[item]
4✔
356

357
        if isinstance(self.options.get('subscribe_topics'), str):
4!
358
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
359
            for item in subscribe_topics:
4✔
360
                if len(item) == 0:
4!
361
                    subscribe_topics.remove(item)
×
362
            self.options['subscribe_topics'] = subscribe_topics
4✔
363

364
        if isinstance(self.options.get('publish_topics'), str):
4!
365
            publish_topics = self.options.get('publish_topics').split(',')
×
366
            for item in publish_topics:
×
367
                if len(item) == 0:
×
368
                    publish_topics.remove(item)
×
369
            self.options['publish_topics'] = publish_topics
×
370

371
    def signal_shutdown(self, *args, **kwargs):
4✔
372
        """Shutdown the Active Fires postprocessing."""
373
        self.close()
×
374

375
    def check_incoming_message_and_get_filename(self, msg):
4✔
376
        """Check the message content and return filename if okay."""
377
        if msg.type not in ['file', 'collection', 'dataset']:
4!
378
            logger.debug("Message type not supported: %s", str(msg.type))
×
379
            return None
×
380

381
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
382
        if not os.path.exists(filename):
4✔
383
            logger.warning("File does not exist: %s", filename)
4✔
384
            return None
4✔
385

386
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
387
        if not file_ok:
4✔
388
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
389
            for output_msg in output_messages:
4✔
390
                logger.debug("Sending message: %s", str(output_msg))
4✔
391
                self.publisher.send(str(output_msg))
4✔
392
            return None
4✔
393

394
        return filename
4✔
395

396
    def _national_save_and_publish(self, feature_collection, ndata, af_shapeff, msg, projname='default'):
4✔
397
        """Take the fearure collection and store the results in a Geojson file and publish."""
NEW
398
        if feature_collection is None:
×
NEW
399
            logger.info("No geojson file created, number of fires after filtering = %d", ndata)
×
NEW
400
            output_messages = self._generate_no_fires_messages(msg,
×
401
                                                               'No true fire detections inside National borders')  # noqa
NEW
402
            return
×
403

NEW
404
        fmda = af_shapeff.metadata
×
NEW
405
        if projname != 'default':
×
NEW
406
            pout = Parser(self.outfile_patterns_national[projname]['geojson_file_pattern'])
×
407
        else:
NEW
408
            pout = Parser(self.outfile_patterns_national['deafult']['geojson_file_pattern'])
×
409

NEW
410
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
×
NEW
411
        logger.debug("Output file path = %s", out_filepath)
×
412

NEW
413
        store_geojson(out_filepath, feature_collection)
×
NEW
414
        output_messages = self.get_output_messages(out_filepath, msg, ndata, proj_name=projname)
×
415

NEW
416
        for output_msg in output_messages:
×
NEW
417
            if output_msg:
×
NEW
418
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
419
                self.publisher.send(str(output_msg))
×
420

421
    def do_postprocessing_on_message(self, msg, filename):
4✔
422
        """Do the fires post processing on a message."""
423
        platform_name = msg.data.get('platform_name')
×
424
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
425
                                                   timezone=self.timezone)
426
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
427
        if len(afdata) == 0:
×
428
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
429
            for output_msg in output_messages:
×
430
                logger.debug("Sending message: %s", str(output_msg))
×
431
                self.publisher.send(str(output_msg))
×
432
            return
×
433

434
        afdata = self.fires_filtering(msg, af_shapeff)
×
435
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
436
        if len(afdata) == 0:
×
437
            logger.debug("No fires - so no regional filtering to be done!")
×
438
            return
×
439

440
        # It is here that we should add a uniue day-ID to each of the detections!
441
        afdata = self.add_unique_day_id(afdata)
×
442
        self.save_id_to_file()
×
443

444
        # 1) Create geojson feature collection
445
        # 2) Dump geojson data to disk
446

UNCOV
447
        feature_collection = geojson_feature_collection_from_detections(afdata,
×
448
                                                                        platform_name=af_shapeff.platform_name)
449

NEW
450
        for proj_name in self.outfile_patterns_national:
×
NEW
451
            if proj_name == "default":
×
NEW
452
                self._national_save_and_publish(feature_collection, len(afdata), af_shapeff, msg)
×
453
            else:
NEW
454
                epsg_str = self.outfile_patterns_national[proj_name].get('projection')
×
NEW
455
                other_fc = map_coordinates_in_feature_collection(feature_collection, epsg_str)
×
NEW
456
                self._national_save_and_publish(other_fc, len(afdata), af_shapeff, msg, proj_name)
×
457

458
        # Do the regional filtering now:
459
        if not self.regional_filtermask:
×
460
            logger.info("No regional filtering is attempted.")
×
461
            return
×
462

463
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
464
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
465
                                                   timezone=self.timezone)
466
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
467
                                                             globstr=self.regional_shapefiles_globstr)
468
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
469
        for region_msg in regional_messages:
×
470
            logger.debug("Sending message: %s", str(region_msg))
×
471
            self.publisher.send(str(region_msg))
×
472

473
    def run(self):
4✔
474
        """Run the AF post processing."""
475
        while self.loop:
×
476
            try:
×
477
                msg = self.listener.output_queue.get(timeout=1)
×
478
                logger.debug("Message: %s", str(msg.data))
×
479
            except Empty:
×
480
                continue
×
481
            else:
482
                filename = self.check_incoming_message_and_get_filename(msg)
×
483
                if not filename:
×
484
                    continue
×
485

486
                self.do_postprocessing_on_message(msg, filename)
×
487

488
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
489
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
490
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
491

492
        afdata = afsff_obj.get_af_data()
4✔
493
        fmda = afsff_obj.metadata
4✔
494

495
        fmda['platform'] = afsff_obj.platform_name
4✔
496

497
        pout = Parser(self.outfile_patterns_regional['default']['geojson_file_pattern'])
4✔
498

499
        output_messages = []
4✔
500
        regions_with_detections = 0
4✔
501
        for region_name in regional_fmask:
4✔
502
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
503
                continue
4✔
504

505
            regions_with_detections = regions_with_detections + 1
4✔
506
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
507

508
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
509
            logger.debug("Output file path = %s", out_filepath)
4✔
510
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
511

512
            # filepath = store_geojson(out_filepath, data_in_region, platform_name=fmda['platform'])
513
            feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
514
                                                                            platform_name=fmda['platform'])
515
            if feature_collection is None:
4!
516
                logger.warning("Something wrong happended storing regional " +
×
517
                               "data to Geojson - area: {name}".format(name=str(region_name)))
518
                continue
×
519

520
            store_geojson(out_filepath, feature_collection)
4✔
521

522
            outmsg = self._generate_output_message(out_filepath, msg, 'default',
4✔
523
                                                   regional_fmask[region_name])
524
            output_messages.append(outmsg)
4✔
525
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
526

527
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
528
                     "detections on this granule: %s" % str(regions_with_detections))
529
        return output_messages
4✔
530

531
    def fires_filtering(self, msg, af_shapeff):
4✔
532
        """Read Active Fire data and perform spatial filtering removing false detections.
533

534
        Do the national filtering first, and then filter out potential false
535
        detections by the special mask for that.
536

537
        """
538
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
539

540
        fmda = af_shapeff.metadata
4✔
541
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
542

543
        pout = Parser(self.outfile_patterns_national['default']['geojson_file_pattern'])
4✔
544
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
545
        logger.debug("Output file path = %s", out_filepath)
4✔
546

547
        # National filtering:
548
        af_shapeff.fires_filtering(self.shp_borders)
4✔
549

550
        # Metadata should be transfered here!
551
        afdata_ff = af_shapeff.get_af_data()
4✔
552

553
        if len(afdata_ff) > 0:
4!
554
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
555
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
556
            afdata_ff = af_shapeff.get_af_data()
4✔
557
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
558

559
        return afdata_ff
4✔
560

561
    def get_output_messages(self, filepath, msg, number_of_data, proj_name='default'):
4✔
562
        """Generate the adequate output message(s) depending on if an output file was created or not."""
563
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
4✔
564
        return [self._generate_output_message(filepath, msg, proj_name)]
4✔
565

566
    def _generate_output_message(self, filepath, input_msg, proj_name, region=None):
4✔
567
        """Create the output message to publish."""
568
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
569
        to_send = prepare_posttroll_message(input_msg, region)
4✔
570
        to_send['uri'] = ('ssh://%s/%s' % (self.host, filepath))
4✔
571
        to_send['uid'] = os.path.basename(filepath)
4✔
572
        to_send['type'] = 'GEOJSON-filtered'
4✔
573
        to_send['format'] = 'geojson'
4✔
574
        if proj_name == 'default':
4✔
575
            to_send['product'] = 'afimg'
4✔
576
        else:
577
            to_send['product'] = 'afimg_{proj_name}'.format(proj_name=proj_name)
4✔
578

579
        pubmsg = Message(output_topic, 'file', to_send)
4✔
580
        return pubmsg
4✔
581

582
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
583
        """Create the output messages to publish."""
584
        to_send = prepare_posttroll_message(input_msg)
4✔
585
        to_send['info'] = msg_string
4✔
586
        publish_messages = []
4✔
587
        for ext in ['National', 'Regional']:
4✔
588
            topic = self.output_topic + '/' + ext
4✔
589
            publish_messages.append(Message(topic, 'info', to_send))
4✔
590

591
        return publish_messages
4✔
592

593
    def _check_borders_shapes_exists(self):
4✔
594
        """Check that the national borders shapefile exists on disk."""
595
        if not os.path.exists(self.shp_borders):
4✔
596
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
597

598
    def _initialize_fire_detection_id(self):
4✔
599
        """Initialize the fire detection ID."""
600
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
601
            self._fire_detection_id = self.get_id_from_file()
4✔
602
        else:
603
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
604

605
    def update_fire_detection_id(self):
4✔
606
        """Update the fire detection ID registry."""
607
        now = datetime.utcnow()
4✔
608
        tdelta = now - self._fire_detection_id['date']
4✔
609
        if tdelta.total_seconds() > 24*3600:
4✔
610
            self._initialize_fire_detection_id()
4✔
611
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
612
            self._initialize_fire_detection_id()
4✔
613

614
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
615

616
    def save_id_to_file(self):
4✔
617
        """Save the (current) detection id on disk.
618

619
        It is assumed that the user permissions are so that a file can actually
620
        be written to disk here!
621
        """
622
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
623
            id_ = self._create_id_string()
4✔
624
            fpt.write(id_)
4✔
625

626
    def get_id_from_file(self):
4✔
627
        """Read the latest stored detection id string from disk and convert to internal format."""
628
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
629
            idstr = fpt.read()
4✔
630

631
        return self._get_id_from_string(idstr)
4✔
632

633
    def _get_id_from_string(self, idstr):
4✔
634
        """Get the detection id from string."""
635
        datestr, counter = idstr.split('-')
4✔
636
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
637
                'counter': int(counter)}
638

639
    def _create_id_string(self):
4✔
640
        """From the internal fire detection id create the id string to be exposed to the user."""
641
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
642
                '-' + str(self._fire_detection_id['counter']))
643

644
    def add_unique_day_id(self, afdata):
4✔
645
        """Add a unique detection id - date + a running number for the day."""
646
        # Add id's to the detections:
647
        id_list = []
4✔
648
        for _i in range(len(afdata)):
4✔
649
            self.update_fire_detection_id()
4✔
650
            id_ = self._create_id_string()
4✔
651
            id_list.append(id_)
4✔
652

653
        afdata['detection_id'] = id_list
4✔
654
        return afdata
4✔
655

656
    def close(self):
4✔
657
        """Shutdown the Active Fires postprocessing."""
658
        logger.info('Terminating Active Fires post processing.')
×
659
        self.loop = False
×
660
        logger.info('Dumping the latest detection id to disk: %s', str(self.filepath_detection_id_cache))
×
661
        self.save_id_to_file()
×
662
        try:
×
663
            self.listener.stop()
×
664
        except Exception:
×
665
            logger.exception("Couldn't stop listener.")
×
666
        if self.publisher:
×
667
            try:
×
668
                self.publisher.stop()
×
669
            except Exception:
×
670
                logger.exception("Couldn't stop publisher.")
×
671

672

673
def check_file_type_okay(file_type):
4✔
674
    """Check if the file is of the correct type."""
675
    if file_type not in ['txt', 'TXT']:
4✔
676
        logger.info('File type not txt: %s', str(file_type))
4✔
677
        return False
4✔
678
    return True
4✔
679

680

681
def get_filename_from_uri(uri):
4✔
682
    """Get the file name from the uri given."""
683
    logger.info('File uri: %s', str(uri))
4✔
684
    url = urlparse(uri)
4✔
685
    return url.path
4✔
686

687

688
def generate_posttroll_topic(output_topic, region=None):
4✔
689
    """Create the topic for the posttroll message to publish."""
690
    if region:
4✔
691
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
692
    else:
693
        output_topic = output_topic + '/National'
4✔
694

695
    return output_topic
4✔
696

697

698
def prepare_posttroll_message(input_msg, region=None):
4✔
699
    """Create the basic posttroll-message fields and return."""
700
    to_send = input_msg.data.copy()
4✔
701
    to_send.pop('dataset', None)
4✔
702
    to_send.pop('collection', None)
4✔
703
    to_send.pop('uri', None)
4✔
704
    to_send.pop('uid', None)
4✔
705
    to_send.pop('format', None)
4✔
706
    to_send.pop('type', None)
4✔
707
    # FIXME! Check that the region_name is stored as a unicode string!
708
    if region:
4✔
709
        to_send['region_name'] = region['attributes']['Testomr']
4✔
710
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
711

712
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc