• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 5544472065

pending completion
5544472065

Pull #16

github

web-flow
Merge 113deaed1 into 46b894fa5
Pull Request #16: Feature add unique detection

259 of 413 branches covered (62.71%)

306 of 352 new or added lines in 6 files covered. (86.93%)

3 existing lines in 2 files now uncovered.

1875 of 2279 relevant lines covered (82.27%)

3.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.74
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbro
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33
from geojson import Feature, Point, FeatureCollection, dump
4✔
34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.utils import datetime_utc2local
4✔
46
from activefires_pp.utils import get_local_timezone_offset
4✔
47
from activefires_pp.utils import json_serial
4✔
48
from activefires_pp.config import read_config
4✔
49
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
50

51
# M-band output:
52
# column 1: latitude of fire pixel (degrees)
53
# column 2: longitude of fire pixel (degrees)
54
# column 3: M13 brightness temperature of fire pixel (K)
55
# column 4: Along-scan fire pixel resolution (km)
56
# column 5: Along-track fire pixel resolution (km)
57
# column 6: detection confidence (%)
58
#           7=low, 8=nominal, 9=high (Imager Resolution)
59
#           0-100 % (Moderate Resolution)
60
# column 7: fire radiative power (MW)
61
# I-band output:
62
# column 1: latitude of fire pixel (degrees)
63
# column 2: longitude of fire pixel (degrees)
64
# column 3: I04 brightness temperature of fire pixel (K)
65
# column 4: Along-scan fire pixel resolution (km)
66
# column 5: Along-track fire pixel resolution (km)
67
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
68
# column 7: fire radiative power (MW)
69
#
70
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
71

72
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
73

74

75
logger = logging.getLogger(__name__)
4✔
76
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
77

78

79
class ActiveFiresShapefileFiltering(object):
4✔
80
    """Reading, filtering and writing Active Fire detections.
81

82
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
83
    Filtering for static and false alarms, and/or simply on geographical regions.
84
    Data is stored in geojson format.
85
    """
86

87
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
88
        """Initialize the ActiveFiresShapefileFiltering class."""
89
        self.input_filepath = filepath
4✔
90
        self._afdata = afdata
4✔
91
        if afdata is None:
4✔
92
            self.metadata = {}
4✔
93
        else:
94
            self.metadata = afdata.attrs
4✔
95

96
        self.timezone = timezone
4✔
97
        self.platform_name = platform_name
4✔
98

99
    def get_af_data(self, filepattern=None, localtime=True):
4✔
100
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
101
        if self._afdata is not None:
4✔
102
            # Make sure the attrs are populated with metadata instance attribute
103
            self._afdata.attrs.update(self.metadata)
4✔
104
            return self._afdata
4✔
105

106
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
107
            # FIXME! Better to raise an exception!?
108
            return self._afdata
×
109

110
        if not filepattern:
4!
111
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
112

113
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
114
        self._afdata = _read_data(self.input_filepath)
4✔
115
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
116

117
        return self._afdata
4✔
118

119
    def _get_metadata_from_filename(self, infile_pattern):
4✔
120
        """From the filename retrieve the metadata such as satellite and sensing time."""
121
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
122

123
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
124
        """Add start and end time to active fires data."""
125
        if localtime:
4✔
126
            logger.info("Convert to local time zone!")
4✔
127
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
128
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
129
        else:
130
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
131
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
132

133
        starttime = starttime.replace(tzinfo=None)
4✔
134
        endtime = endtime.replace(tzinfo=None)
4✔
135

136
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
137
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
138

139
        logger.info('Start and end times: %s %s',
4✔
140
                    str(self._afdata['starttime'][0]),
141
                    str(self._afdata['endtime'][0]))
142

143
    def _apply_timezone_offset(self, obstime):
4✔
144
        """Apply the time zone offset to the datetime objects."""
145
        obstime_offset = get_local_timezone_offset(self.timezone)
×
146
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
147
                         len(self._afdata)).astype(np.datetime64)
148

149
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
150
        """Remove fires outside National borders or filter out potential false detections.
151

152
        If *inside* is True the filtering will keep those detections that are inside the polygon.
153
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
154
        """
155
        detections = self._afdata
4✔
156

157
        lons = detections.longitude.values
4✔
158
        lats = detections.latitude.values
4✔
159

160
        toc = time.time()
4✔
161
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
162
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
163

164
        self._afdata = detections[points_inside == inside]
4✔
165

166
        if len(self._afdata) == 0:
4!
167
            logger.debug("No fires after filtering on Polygon...")
×
168
        else:
169
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
170

171
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
172
        """Get the regional filter masks from the shapefile."""
173
        detections = self._afdata
×
174

175
        lons = detections.longitude.values
×
176
        lats = detections.latitude.values
×
177

178
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
179
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
180
        shape_geom = ShapeGeometry(shapefile, globstr)
×
181
        shape_geom.load()
×
182

183
        p__ = pyproj.Proj(shape_geom.proj4str)
×
184
        metersx, metersy = p__(lons, lats)
×
185
        points = np.vstack([metersx, metersy]).T
×
186

187
        regional_masks = {}
×
188

189
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
190
            test_omr = attr['Testomr']
×
191
            all_inside_test_omr = False
×
192
            some_inside_test_omr = False
×
193
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
194

195
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
196

197
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
198
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
199
            else:
200
                shape = geometry
×
201
                pth = Path(shape.exterior.coords)
×
202
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
203

204
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
205
                all_inside_test_omr = True
×
206
                some_inside_test_omr = True
×
207
                logger.debug("All points inside test area!")
×
208
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
209
                some_inside_test_omr = True
×
210
                logger.debug("Some points inside test area!")
×
211

212
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
213
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
214

215
        return regional_masks
×
216

217

218
def _read_data(filepath):
4✔
219
    """Read the AF data."""
220
    with open(filepath, 'r') as fpt:
×
221
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
222

223

224
def get_metadata_from_filename(infile_pattern, filepath):
4✔
225
    """From the filename and its pattern get basic metadata of the satellite observations."""
226
    p__ = Parser(infile_pattern)
4✔
227
    fname = os.path.basename(filepath)
4✔
228
    try:
4✔
229
        res = p__.parse(fname)
4✔
230
    except ValueError:
×
231
        # Do something!
232
        return None
×
233

234
    # Fix the end time:
235
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
236
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
237
                       res['end_hour'].second)
238
    if endtime < res['start_time']:
4!
239
        endtime = endtime + timedelta(days=1)
×
240

241
    res['end_time'] = endtime
4✔
242

243
    return res
4✔
244

245

246
def store(output_filename, detections):
4✔
247
    """Store the filtered AF detections on disk."""
248
    if len(detections) > 0:
×
249
        detections.to_csv(output_filename, index=False)
×
250
        return output_filename
×
251
    else:
252
        logger.debug("No detections to save!")
×
253
        return None
×
254

255

256
def geojson_feature_collection_from_detections(detections, platform_name=None):
4✔
257
    """Create the Geojson feature collection from fire detection data."""
258
    if len(detections) == 0:
4!
259
        logger.debug("No detections to save!")
×
260
        return None
×
261

262
    # Convert points to GeoJSON
263
    features = []
4✔
264
    for idx in range(len(detections)):
4✔
265
        starttime = detections.iloc[idx].starttime
4✔
266
        endtime = detections.iloc[idx].endtime
4✔
267
        mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
4✔
268
                                                         starttime.to_pydatetime()) / 2.
269

270
        prop = {'power': detections.iloc[idx].power,
4✔
271
                'tb': detections.iloc[idx].tb,
272
                'confidence': int(detections.iloc[idx].conf),
273
                'observation_time': json_serial(mean_granule_time)
274
                }
275
        if platform_name:
4!
276
            prop['platform_name'] = platform_name
4✔
277
        else:
278
            logger.debug("No platform name specified for output")
×
279

280
        feat = Feature(
4✔
281
            geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
282
            properties=prop)
283
        features.append(feat)
4✔
284

285
    return FeatureCollection(features)
4✔
286

287

288
def store_geojson(output_filename, feature_collection):
4✔
289
    """Store the Geojson feature collection of fire detections on disk."""
290
    path = os.path.dirname(output_filename)
×
291
    if not os.path.exists(path):
×
292
        logger.info("Create directory: %s", path)
×
293
        os.makedirs(path)
×
294

295
    with open(output_filename, 'w') as fpt:
×
296
        dump(feature_collection, fpt)
×
297

298

299
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
300
    """Get mask for points from a shapely Multipolygon."""
301
    shape = geometry.geoms[0]
4✔
302
    pth = Path(shape.exterior.coords)
4✔
303
    mask = pth.contains_points(points)
4✔
304

305
    if sum(mask) == len(points):
4!
306
        return mask
×
307

308
    constituent_part = geometry.geoms[start_idx:]
4✔
309
    for shape in constituent_part.geoms:
4✔
310
        pth = Path(shape.exterior.coords)
4✔
311
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
312
        if sum(mask) == len(points):
4✔
313
            break
4✔
314

315
    return mask
4✔
316

317

318
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
319
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
320
    lons, lats = lonlats
4✔
321
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
322
    shape_geom = ShapeGeometry(shapefile)
4✔
323
    shape_geom.load()
4✔
324

325
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
326

327
    # There is only one geometry/multi-polygon!
328
    geometry = shape_geom.geometries[0]
4✔
329

330
    metersx, metersy = p__(lons, lats)
4✔
331
    points = np.vstack([metersx, metersy]).T
4✔
332

333
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
334

335

336
class ActiveFiresPostprocessing(Thread):
4✔
337
    """The active fires post processor."""
338

339
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
340
        """Initialize the active fires post processor class."""
341
        super().__init__()
4✔
342
        self.shp_borders = shp_borders
4✔
343
        self.shp_filtermask = shp_mask
4✔
344

345
        self.regional_filtermask = regional_filtermask
4✔
346
        self.configfile = configfile
4✔
347
        self.options = {}
4✔
348

349
        config = read_config(self.configfile)
4✔
350
        self._set_options_from_config(config)
4✔
351

352
        self.host = socket.gethostname()
4✔
353
        self.timezone = self.options.get('timezone', 'GMT')
4✔
354

355
        self.input_topic = self.options['subscribe_topics'][0]
4✔
356
        self.output_topic = self.options['publish_topic']
4✔
357
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
358
        self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
4✔
359
        self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')
4✔
360
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
361
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
362

363
        frmt = self.options['regional_shapefiles_format']
4✔
364
        self.regional_shapefiles_globstr = globify(frmt)
4✔
365

366
        self._fire_detection_id = None
4✔
367
        self._initialize_fire_detection_id()
4✔
368

369
        self.listener = None
4✔
370
        self.publisher = None
4✔
371
        self.loop = False
4✔
372
        self._setup_and_start_communication()
4✔
373

374
    def _setup_and_start_communication(self):
4✔
375
        """Set up the Posttroll communication and start the publisher."""
376
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
377
        now = datetime_utc2local(datetime.now(), self.timezone)
×
378
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
379

380
        self._check_borders_shapes_exists()
×
381

382
        self.listener = ListenerContainer(topics=[self.input_topic])
×
383
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
384
        self.publisher.start()
×
385
        self.loop = True
×
386
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
387

388
    def _set_options_from_config(self, config):
4✔
389
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
390
        for item in config:
4✔
391
            if not isinstance(config[item], dict):
4!
392
                self.options[item] = config[item]
4✔
393

394
        if isinstance(self.options.get('subscribe_topics'), str):
4!
395
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
396
            for item in subscribe_topics:
4✔
397
                if len(item) == 0:
4!
398
                    subscribe_topics.remove(item)
×
399
            self.options['subscribe_topics'] = subscribe_topics
4✔
400

401
        if isinstance(self.options.get('publish_topics'), str):
4!
402
            publish_topics = self.options.get('publish_topics').split(',')
×
403
            for item in publish_topics:
×
404
                if len(item) == 0:
×
405
                    publish_topics.remove(item)
×
406
            self.options['publish_topics'] = publish_topics
×
407

408
    def signal_shutdown(self, *args, **kwargs):
4✔
409
        """Shutdown the Active Fires postprocessing."""
410
        self.close()
×
411

412
    def check_incoming_message_and_get_filename(self, msg):
4✔
413
        """Check the message content and return filename if okay."""
414
        if msg.type not in ['file', 'collection', 'dataset']:
4!
NEW
415
            logger.debug("Message type not supported: %s", str(msg.type))
×
NEW
416
            return None
×
417

418
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
419
        if not os.path.exists(filename):
4✔
420
            logger.warning("File does not exist: %s", filename)
4✔
421
            return None
4✔
422

423
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
424
        if not file_ok:
4✔
425
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
426
            for output_msg in output_messages:
4✔
427
                logger.debug("Sending message: %s", str(output_msg))
4✔
428
                self.publisher.send(str(output_msg))
4✔
429
            return None
4✔
430

431
        return filename
4✔
432

433
    def do_postprocessing_on_message(self, msg, filename):
4✔
434
        """Do the fires post processing on a message."""
NEW
435
        platform_name = msg.data.get('platform_name')
×
NEW
436
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
437
                                                   timezone=self.timezone)
NEW
438
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
NEW
439
        if len(afdata) == 0:
×
NEW
440
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
NEW
441
            for output_msg in output_messages:
×
NEW
442
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
443
                self.publisher.send(str(output_msg))
×
NEW
444
            return
×
445

NEW
446
        afdata = self.fires_filtering(msg, af_shapeff)
×
NEW
447
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
NEW
448
        if len(afdata) == 0:
×
NEW
449
            logger.debug("No fires - so no regional filtering to be done!")
×
NEW
450
            return
×
451

452
        # It is here that we should add a uniue day-ID to each of the detections!
NEW
453
        afdata = self.add_unique_day_id(afdata)
×
NEW
454
        self.save_id_to_file()
×
455

456
        # 1) Create geojson feature collection
457
        # 2) Dump geojson data to disk
NEW
458
        feature_collection = geojson_feature_collection_from_detections(afdata,
×
459
                                                                        platform_name=af_shapeff.platform_name)
460

NEW
461
        fmda = af_shapeff.metadata
×
NEW
462
        pout = Parser(self.outfile_pattern_national)
×
NEW
463
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
×
NEW
464
        logger.debug("Output file path = %s", out_filepath)
×
465

NEW
466
        if feature_collection is None:
×
NEW
467
            logger.info("No geojson file created, number of fires after filtering = %d", len(afdata))
×
NEW
468
            output_messages = self._generate_no_fires_messages(msg,
×
469
                                                               'No true fire detections inside National borders')  # noqa
470
        else:
NEW
471
            store_geojson(out_filepath, feature_collection)
×
NEW
472
            output_messages = self.get_output_messages(out_filepath, msg, len(afdata))
×
473

NEW
474
        for output_msg in output_messages:
×
NEW
475
            if output_msg:
×
NEW
476
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
477
                self.publisher.send(str(output_msg))
×
478

479
        # Do the regional filtering now:
NEW
480
        if not self.regional_filtermask:
×
NEW
481
            logger.info("No regional filtering is attempted.")
×
NEW
482
            return
×
483

484
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
NEW
485
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
486
                                                   timezone=self.timezone)
NEW
487
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
488
                                                             globstr=self.regional_shapefiles_globstr)
NEW
489
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
NEW
490
        for region_msg in regional_messages:
×
NEW
491
            logger.debug("Sending message: %s", str(region_msg))
×
NEW
492
            self.publisher.send(str(region_msg))
×
493

494
    def run(self):
4✔
495
        """Run the AF post processing."""
496
        while self.loop:
×
497
            try:
×
498
                msg = self.listener.output_queue.get(timeout=1)
×
499
                logger.debug("Message: %s", str(msg.data))
×
500
            except Empty:
×
501
                continue
×
502
            else:
NEW
503
                filename = self.check_incoming_message_and_get_filename(msg)
×
NEW
504
                if not filename:
×
UNCOV
505
                    continue
×
506

NEW
507
                self.do_postprocessing_on_message(msg, filename)
×
508

509
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
510
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
511
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
512

513
        afdata = afsff_obj.get_af_data()
4✔
514
        fmda = afsff_obj.metadata
4✔
515

516
        fmda['platform'] = afsff_obj.platform_name
4✔
517

518
        pout = Parser(self.outfile_pattern_regional)
4✔
519

520
        output_messages = []
4✔
521
        regions_with_detections = 0
4✔
522
        for region_name in regional_fmask:
4✔
523
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
524
                continue
4✔
525

526
            regions_with_detections = regions_with_detections + 1
4✔
527
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
528

529
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
530
            logger.debug("Output file path = %s", out_filepath)
4✔
531
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
532

533
            # filepath = store_geojson(out_filepath, data_in_region, platform_name=fmda['platform'])
534
            feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
535
                                                                            platform_name=fmda['platform'])
536
            if feature_collection is None:
4!
UNCOV
537
                logger.warning("Something wrong happended storing regional " +
×
538
                               "data to Geojson - area: {name}".format(name=str(region_name)))
539
                continue
×
540

541
            store_geojson(out_filepath, feature_collection)
4✔
542

543
            outmsg = self._generate_output_message(out_filepath, msg, regional_fmask[region_name])
4✔
544
            output_messages.append(outmsg)
4✔
545
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
546

547
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
548
                     "detections on this granule: %s" % str(regions_with_detections))
549
        return output_messages
4✔
550

551
    def fires_filtering(self, msg, af_shapeff):
4✔
552
        """Read Active Fire data and perform spatial filtering removing false detections.
553

554
        Do the national filtering first, and then filter out potential false
555
        detections by the special mask for that.
556

557
        """
558
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
559

560
        fmda = af_shapeff.metadata
4✔
561
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
562

563
        pout = Parser(self.outfile_pattern_national)
4✔
564
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
565
        logger.debug("Output file path = %s", out_filepath)
4✔
566

567
        # National filtering:
568
        af_shapeff.fires_filtering(self.shp_borders)
4✔
569

570
        # Metadata should be transfered here!
571
        afdata_ff = af_shapeff.get_af_data()
4✔
572

573
        if len(afdata_ff) > 0:
4!
574
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
575
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
576
            afdata_ff = af_shapeff.get_af_data()
4✔
577
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
578

579
        return afdata_ff
4✔
580

581
    def get_output_messages(self, filepath, msg, number_of_data):
4✔
582
        """Generate the adequate output message(s) depending on if an output file was created or not."""
NEW
583
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
×
NEW
584
        return [self._generate_output_message(filepath, msg)]
×
585

586
    def _generate_output_message(self, filepath, input_msg, region=None):
4✔
587
        """Create the output message to publish."""
588
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
589
        to_send = prepare_posttroll_message(input_msg, region)
4✔
590
        to_send['uri'] = ('ssh://%s/%s' % (self.host, filepath))
4✔
591
        to_send['uid'] = os.path.basename(filepath)
4✔
592
        to_send['type'] = 'GEOJSON-filtered'
4✔
593
        to_send['format'] = 'geojson'
4✔
594
        to_send['product'] = 'afimg'
4✔
595
        pubmsg = Message(output_topic, 'file', to_send)
4✔
596
        return pubmsg
4✔
597

598
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
599
        """Create the output messages to publish."""
600
        to_send = prepare_posttroll_message(input_msg)
4✔
601
        to_send['info'] = msg_string
4✔
602
        publish_messages = []
4✔
603
        for ext in ['National', 'Regional']:
4✔
604
            topic = self.output_topic + '/' + ext
4✔
605
            publish_messages.append(Message(topic, 'info', to_send))
4✔
606

607
        return publish_messages
4✔
608

609
    def _check_borders_shapes_exists(self):
4✔
610
        """Check that the national borders shapefile exists on disk."""
611
        if not os.path.exists(self.shp_borders):
4✔
612
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
613

614
    def _initialize_fire_detection_id(self):
4✔
615
        """Initialize the fire detection ID."""
616
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
617
            self._fire_detection_id = self.get_id_from_file()
4✔
618
        else:
619
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
620

621
    def update_fire_detection_id(self):
4✔
622
        """Update the fire detection ID registry."""
623
        now = datetime.utcnow()
4✔
624
        tdelta = now - self._fire_detection_id['date']
4✔
625
        if tdelta.total_seconds() > 24*3600:
4✔
626
            self._initialize_fire_detection_id()
4✔
627
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
628
            self._initialize_fire_detection_id()
4✔
629

630
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
631

632
    def save_id_to_file(self):
4✔
633
        """Save the (current) detection id on disk."""
634
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
635
            id_ = self._create_id_string()
4✔
636
            fpt.write(id_)
4✔
637

638
    def get_id_from_file(self):
4✔
639
        """Read the latest stored detection id string from disk and convert to internal format."""
640
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
641
            idstr = fpt.read()
4✔
642

643
        return self._get_id_from_string(idstr)
4✔
644

645
    def _get_id_from_string(self, idstr):
4✔
646
        """Get the detection id from string."""
647
        datestr, counter = idstr.split('-')
4✔
648
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
649
                'counter': int(counter)}
650

651
    def _create_id_string(self):
4✔
652
        """From the internal fire detection id create the id string to be exposed to the user."""
653
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
654
                '-' + str(self._fire_detection_id['counter']))
655

656
    def add_unique_day_id(self, afdata):
4✔
657
        """Add a unique detection id - date + a running number for the day."""
658
        # Add id's to the detections:
659
        id_list = []
4✔
660
        for _i in range(len(afdata)):
4✔
661
            self.update_fire_detection_id()
4✔
662
            id_ = self._create_id_string()
4✔
663
            id_list.append(id_)
4✔
664

665
        afdata['detection_id'] = id_list
4✔
666
        return afdata
4✔
667

668
    def close(self):
4✔
669
        """Shutdown the Active Fires postprocessing."""
670
        logger.info('Terminating Active Fires post processing.')
×
671
        self.loop = False
×
672
        try:
×
673
            self.listener.stop()
×
674
        except Exception:
×
675
            logger.exception("Couldn't stop listener.")
×
676
        if self.publisher:
×
677
            try:
×
678
                self.publisher.stop()
×
679
            except Exception:
×
680
                logger.exception("Couldn't stop publisher.")
×
681

682

683
def check_file_type_okay(file_type):
4✔
684
    """Check if the file is of the correct type."""
685
    if file_type not in ['txt', 'TXT']:
4✔
686
        logger.info('File type not txt: %s', str(file_type))
4✔
687
        return False
4✔
688
    return True
4✔
689

690

691
def get_filename_from_uri(uri):
4✔
692
    """Get the file name from the uri given."""
693
    logger.info('File uri: %s', str(uri))
4✔
694
    url = urlparse(uri)
4✔
695
    return url.path
4✔
696

697

698
def generate_posttroll_topic(output_topic, region=None):
4✔
699
    """Create the topic for the posttroll message to publish."""
700
    if region:
4✔
701
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
702
    else:
703
        output_topic = output_topic + '/National'
4✔
704

705
    return output_topic
4✔
706

707

708
def prepare_posttroll_message(input_msg, region=None):
4✔
709
    """Create the basic posttroll-message fields and return."""
710
    to_send = input_msg.data.copy()
4✔
711
    to_send.pop('dataset', None)
4✔
712
    to_send.pop('collection', None)
4✔
713
    to_send.pop('uri', None)
4✔
714
    to_send.pop('uid', None)
4✔
715
    to_send.pop('format', None)
4✔
716
    to_send.pop('type', None)
4✔
717
    # FIXME! Check that the region_name is stored as a unicode string!
718
    if region:
4✔
719
        to_send['region_name'] = region['attributes']['Testomr']
4✔
720
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
721

722
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc