• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 5553257011

pending completion
5553257011

Pull #16

github

web-flow
Merge a79ebbdab into 46b894fa5
Pull Request #16: Feature add unique detection

259 of 413 branches covered (62.71%)

324 of 372 new or added lines in 6 files covered. (87.1%)

3 existing lines in 2 files now uncovered.

1893 of 2299 relevant lines covered (82.34%)

3.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.5
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbro
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33
from geojson import Feature, Point, FeatureCollection, dump
4✔
34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.utils import datetime_utc2local
4✔
46
from activefires_pp.utils import get_local_timezone_offset
4✔
47
from activefires_pp.utils import json_serial
4✔
48
from activefires_pp.config import read_config
4✔
49
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
50

51
# M-band output:
52
# column 1: latitude of fire pixel (degrees)
53
# column 2: longitude of fire pixel (degrees)
54
# column 3: M13 brightness temperature of fire pixel (K)
55
# column 4: Along-scan fire pixel resolution (km)
56
# column 5: Along-track fire pixel resolution (km)
57
# column 6: detection confidence (%)
58
#           7=low, 8=nominal, 9=high (Imager Resolution)
59
#           0-100 % (Moderate Resolution)
60
# column 7: fire radiative power (MW)
61
# I-band output:
62
# column 1: latitude of fire pixel (degrees)
63
# column 2: longitude of fire pixel (degrees)
64
# column 3: I04 brightness temperature of fire pixel (K)
65
# column 4: Along-scan fire pixel resolution (km)
66
# column 5: Along-track fire pixel resolution (km)
67
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
68
# column 7: fire radiative power (MW)
69
#
70
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
71

72
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
73

74

75
logger = logging.getLogger(__name__)
4✔
76
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
77

78

79
class ActiveFiresShapefileFiltering(object):
4✔
80
    """Reading, filtering and writing Active Fire detections.
81

82
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
83
    Filtering for static and false alarms, and/or simply on geographical regions.
84
    Data is stored in geojson format.
85
    """
86

87
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
88
        """Initialize the ActiveFiresShapefileFiltering class."""
89
        self.input_filepath = filepath
4✔
90
        self._afdata = afdata
4✔
91
        if afdata is None:
4✔
92
            self.metadata = {}
4✔
93
        else:
94
            self.metadata = afdata.attrs
4✔
95

96
        self.timezone = timezone
4✔
97
        self.platform_name = platform_name
4✔
98

99
    def get_af_data(self, filepattern=None, localtime=True):
4✔
100
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
101
        if self._afdata is not None:
4✔
102
            # Make sure the attrs are populated with metadata instance attribute
103
            self._afdata.attrs.update(self.metadata)
4✔
104
            return self._afdata
4✔
105

106
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
107
            # FIXME! Better to raise an exception!?
108
            return self._afdata
×
109

110
        if not filepattern:
4!
111
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
112

113
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
114
        self._afdata = _read_data(self.input_filepath)
4✔
115
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
116

117
        return self._afdata
4✔
118

119
    def _get_metadata_from_filename(self, infile_pattern):
4✔
120
        """From the filename retrieve the metadata such as satellite and sensing time."""
121
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
122

123
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
124
        """Add start and end time to active fires data."""
125
        if localtime:
4✔
126
            logger.info("Convert to local time zone!")
4✔
127
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
128
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
129
        else:
130
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
131
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
132

133
        starttime = starttime.replace(tzinfo=None)
4✔
134
        endtime = endtime.replace(tzinfo=None)
4✔
135

136
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
137
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
138

139
        logger.info('Start and end times: %s %s',
4✔
140
                    str(self._afdata['starttime'][0]),
141
                    str(self._afdata['endtime'][0]))
142

143
    def _apply_timezone_offset(self, obstime):
4✔
144
        """Apply the time zone offset to the datetime objects."""
145
        obstime_offset = get_local_timezone_offset(self.timezone)
×
146
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
147
                         len(self._afdata)).astype(np.datetime64)
148

149
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
150
        """Remove fires outside National borders or filter out potential false detections.
151

152
        If *inside* is True the filtering will keep those detections that are inside the polygon.
153
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
154
        """
155
        detections = self._afdata
4✔
156

157
        lons = detections.longitude.values
4✔
158
        lats = detections.latitude.values
4✔
159

160
        toc = time.time()
4✔
161
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
162
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
163

164
        self._afdata = detections[points_inside == inside]
4✔
165

166
        if len(self._afdata) == 0:
4!
167
            logger.debug("No fires after filtering on Polygon...")
×
168
        else:
169
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
170

171
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
172
        """Get the regional filter masks from the shapefile."""
173
        detections = self._afdata
×
174

175
        lons = detections.longitude.values
×
176
        lats = detections.latitude.values
×
177

178
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
179
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
180
        shape_geom = ShapeGeometry(shapefile, globstr)
×
181
        shape_geom.load()
×
182

183
        p__ = pyproj.Proj(shape_geom.proj4str)
×
184
        metersx, metersy = p__(lons, lats)
×
185
        points = np.vstack([metersx, metersy]).T
×
186

187
        regional_masks = {}
×
188

189
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
190
            test_omr = attr['Testomr']
×
191
            all_inside_test_omr = False
×
192
            some_inside_test_omr = False
×
193
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
194

195
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
196

197
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
198
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
199
            else:
200
                shape = geometry
×
201
                pth = Path(shape.exterior.coords)
×
202
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
203

204
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
205
                all_inside_test_omr = True
×
206
                some_inside_test_omr = True
×
207
                logger.debug("All points inside test area!")
×
208
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
209
                some_inside_test_omr = True
×
210
                logger.debug("Some points inside test area!")
×
211

212
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
213
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
214

215
        return regional_masks
×
216

217

218
def _read_data(filepath):
4✔
219
    """Read the AF data."""
220
    with open(filepath, 'r') as fpt:
×
221
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
222

223

224
def get_metadata_from_filename(infile_pattern, filepath):
4✔
225
    """From the filename and its pattern get basic metadata of the satellite observations."""
226
    p__ = Parser(infile_pattern)
4✔
227
    fname = os.path.basename(filepath)
4✔
228
    try:
4✔
229
        res = p__.parse(fname)
4✔
230
    except ValueError:
×
231
        # Do something!
232
        return None
×
233

234
    # Fix the end time:
235
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
236
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
237
                       res['end_hour'].second)
238
    if endtime < res['start_time']:
4!
239
        endtime = endtime + timedelta(days=1)
×
240

241
    res['end_time'] = endtime
4✔
242

243
    return res
4✔
244

245

246
def store(output_filename, detections):
4✔
247
    """Store the filtered AF detections on disk."""
248
    if len(detections) > 0:
×
249
        detections.to_csv(output_filename, index=False)
×
250
        return output_filename
×
251
    else:
252
        logger.debug("No detections to save!")
×
253
        return None
×
254

255

256
def geojson_feature_collection_from_detections(detections, platform_name=None):
4✔
257
    """Create the Geojson feature collection from fire detection data."""
258
    if len(detections) == 0:
4!
259
        logger.debug("No detections to save!")
×
260
        return None
×
261

262
    # Convert points to GeoJSON
263
    features = []
4✔
264
    for idx in range(len(detections)):
4✔
265
        starttime = detections.iloc[idx].starttime
4✔
266
        endtime = detections.iloc[idx].endtime
4✔
267
        mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
4✔
268
                                                         starttime.to_pydatetime()) / 2.
269

270
        prop = {'power': detections.iloc[idx].power,
4✔
271
                'tb': detections.iloc[idx].tb,
272
                'confidence': int(detections.iloc[idx].conf),
273
                'id': detections.iloc[idx].detection_id,
274
                'observation_time': json_serial(mean_granule_time)
275
                }
276
        if platform_name:
4!
277
            prop['platform_name'] = platform_name
4✔
278
        else:
279
            logger.debug("No platform name specified for output")
×
280

281
        feat = Feature(
4✔
282
            geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
283
            properties=prop)
284
        features.append(feat)
4✔
285

286
    return FeatureCollection(features)
4✔
287

288

289
def store_geojson(output_filename, feature_collection):
4✔
290
    """Store the Geojson feature collection of fire detections on disk."""
291
    path = os.path.dirname(output_filename)
×
292
    if not os.path.exists(path):
×
293
        logger.info("Create directory: %s", path)
×
294
        os.makedirs(path)
×
295

296
    with open(output_filename, 'w') as fpt:
×
297
        dump(feature_collection, fpt)
×
298

299

300
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
301
    """Get mask for points from a shapely Multipolygon."""
302
    shape = geometry.geoms[0]
4✔
303
    pth = Path(shape.exterior.coords)
4✔
304
    mask = pth.contains_points(points)
4✔
305

306
    if sum(mask) == len(points):
4!
307
        return mask
×
308

309
    constituent_part = geometry.geoms[start_idx:]
4✔
310
    for shape in constituent_part.geoms:
4✔
311
        pth = Path(shape.exterior.coords)
4✔
312
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
313
        if sum(mask) == len(points):
4✔
314
            break
4✔
315

316
    return mask
4✔
317

318

319
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
320
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
321
    lons, lats = lonlats
4✔
322
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
323
    shape_geom = ShapeGeometry(shapefile)
4✔
324
    shape_geom.load()
4✔
325

326
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
327

328
    # There is only one geometry/multi-polygon!
329
    geometry = shape_geom.geometries[0]
4✔
330

331
    metersx, metersy = p__(lons, lats)
4✔
332
    points = np.vstack([metersx, metersy]).T
4✔
333

334
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
335

336

337
class ActiveFiresPostprocessing(Thread):
4✔
338
    """The active fires post processor."""
339

340
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
341
        """Initialize the active fires post processor class."""
342
        super().__init__()
4✔
343
        self.shp_borders = shp_borders
4✔
344
        self.shp_filtermask = shp_mask
4✔
345

346
        self.regional_filtermask = regional_filtermask
4✔
347
        self.configfile = configfile
4✔
348
        self.options = {}
4✔
349

350
        config = read_config(self.configfile)
4✔
351
        self._set_options_from_config(config)
4✔
352

353
        self.host = socket.gethostname()
4✔
354
        self.timezone = self.options.get('timezone', 'GMT')
4✔
355

356
        self.input_topic = self.options['subscribe_topics'][0]
4✔
357
        self.output_topic = self.options['publish_topic']
4✔
358
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
359
        self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
4✔
360
        self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')
4✔
361
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
362
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
363

364
        frmt = self.options['regional_shapefiles_format']
4✔
365
        self.regional_shapefiles_globstr = globify(frmt)
4✔
366

367
        self._fire_detection_id = None
4✔
368
        self._initialize_fire_detection_id()
4✔
369

370
        self.listener = None
4✔
371
        self.publisher = None
4✔
372
        self.loop = False
4✔
373
        self._setup_and_start_communication()
4✔
374

375
    def _setup_and_start_communication(self):
4✔
376
        """Set up the Posttroll communication and start the publisher."""
377
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
378
        now = datetime_utc2local(datetime.now(), self.timezone)
×
379
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
380

381
        self._check_borders_shapes_exists()
×
382

383
        self.listener = ListenerContainer(topics=[self.input_topic])
×
384
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
385
        self.publisher.start()
×
386
        self.loop = True
×
387
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
388

389
    def _set_options_from_config(self, config):
4✔
390
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
391
        for item in config:
4✔
392
            if not isinstance(config[item], dict):
4!
393
                self.options[item] = config[item]
4✔
394

395
        if isinstance(self.options.get('subscribe_topics'), str):
4!
396
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
397
            for item in subscribe_topics:
4✔
398
                if len(item) == 0:
4!
399
                    subscribe_topics.remove(item)
×
400
            self.options['subscribe_topics'] = subscribe_topics
4✔
401

402
        if isinstance(self.options.get('publish_topics'), str):
4!
403
            publish_topics = self.options.get('publish_topics').split(',')
×
404
            for item in publish_topics:
×
405
                if len(item) == 0:
×
406
                    publish_topics.remove(item)
×
407
            self.options['publish_topics'] = publish_topics
×
408

409
    def signal_shutdown(self, *args, **kwargs):
4✔
410
        """Shutdown the Active Fires postprocessing."""
411
        self.close()
×
412

413
    def check_incoming_message_and_get_filename(self, msg):
4✔
414
        """Check the message content and return filename if okay."""
415
        if msg.type not in ['file', 'collection', 'dataset']:
4!
NEW
416
            logger.debug("Message type not supported: %s", str(msg.type))
×
NEW
417
            return None
×
418

419
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
420
        if not os.path.exists(filename):
4✔
421
            logger.warning("File does not exist: %s", filename)
4✔
422
            return None
4✔
423

424
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
425
        if not file_ok:
4✔
426
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
427
            for output_msg in output_messages:
4✔
428
                logger.debug("Sending message: %s", str(output_msg))
4✔
429
                self.publisher.send(str(output_msg))
4✔
430
            return None
4✔
431

432
        return filename
4✔
433

434
    def do_postprocessing_on_message(self, msg, filename):
4✔
435
        """Do the fires post processing on a message."""
NEW
436
        platform_name = msg.data.get('platform_name')
×
NEW
437
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
438
                                                   timezone=self.timezone)
NEW
439
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
NEW
440
        if len(afdata) == 0:
×
NEW
441
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
NEW
442
            for output_msg in output_messages:
×
NEW
443
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
444
                self.publisher.send(str(output_msg))
×
NEW
445
            return
×
446

NEW
447
        afdata = self.fires_filtering(msg, af_shapeff)
×
NEW
448
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
NEW
449
        if len(afdata) == 0:
×
NEW
450
            logger.debug("No fires - so no regional filtering to be done!")
×
NEW
451
            return
×
452

453
        # It is here that we should add a uniue day-ID to each of the detections!
NEW
454
        afdata = self.add_unique_day_id(afdata)
×
NEW
455
        self.save_id_to_file()
×
456

457
        # 1) Create geojson feature collection
458
        # 2) Dump geojson data to disk
NEW
459
        feature_collection = geojson_feature_collection_from_detections(afdata,
×
460
                                                                        platform_name=af_shapeff.platform_name)
461

NEW
462
        fmda = af_shapeff.metadata
×
NEW
463
        pout = Parser(self.outfile_pattern_national)
×
NEW
464
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
×
NEW
465
        logger.debug("Output file path = %s", out_filepath)
×
466

NEW
467
        if feature_collection is None:
×
NEW
468
            logger.info("No geojson file created, number of fires after filtering = %d", len(afdata))
×
NEW
469
            output_messages = self._generate_no_fires_messages(msg,
×
470
                                                               'No true fire detections inside National borders')  # noqa
471
        else:
NEW
472
            store_geojson(out_filepath, feature_collection)
×
NEW
473
            output_messages = self.get_output_messages(out_filepath, msg, len(afdata))
×
474

NEW
475
        for output_msg in output_messages:
×
NEW
476
            if output_msg:
×
NEW
477
                logger.debug("Sending message: %s", str(output_msg))
×
NEW
478
                self.publisher.send(str(output_msg))
×
479

480
        # Do the regional filtering now:
NEW
481
        if not self.regional_filtermask:
×
NEW
482
            logger.info("No regional filtering is attempted.")
×
NEW
483
            return
×
484

485
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
NEW
486
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
487
                                                   timezone=self.timezone)
NEW
488
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
489
                                                             globstr=self.regional_shapefiles_globstr)
NEW
490
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
NEW
491
        for region_msg in regional_messages:
×
NEW
492
            logger.debug("Sending message: %s", str(region_msg))
×
NEW
493
            self.publisher.send(str(region_msg))
×
494

495
    def run(self):
4✔
496
        """Run the AF post processing."""
497
        while self.loop:
×
498
            try:
×
499
                msg = self.listener.output_queue.get(timeout=1)
×
500
                logger.debug("Message: %s", str(msg.data))
×
501
            except Empty:
×
502
                continue
×
503
            else:
NEW
504
                filename = self.check_incoming_message_and_get_filename(msg)
×
NEW
505
                if not filename:
×
UNCOV
506
                    continue
×
507

NEW
508
                self.do_postprocessing_on_message(msg, filename)
×
509

510
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
511
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
512
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
513

514
        afdata = afsff_obj.get_af_data()
4✔
515
        fmda = afsff_obj.metadata
4✔
516

517
        fmda['platform'] = afsff_obj.platform_name
4✔
518

519
        pout = Parser(self.outfile_pattern_regional)
4✔
520

521
        output_messages = []
4✔
522
        regions_with_detections = 0
4✔
523
        for region_name in regional_fmask:
4✔
524
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
525
                continue
4✔
526

527
            regions_with_detections = regions_with_detections + 1
4✔
528
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
529

530
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
531
            logger.debug("Output file path = %s", out_filepath)
4✔
532
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
533

534
            # filepath = store_geojson(out_filepath, data_in_region, platform_name=fmda['platform'])
535
            feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
536
                                                                            platform_name=fmda['platform'])
537
            if feature_collection is None:
4!
UNCOV
538
                logger.warning("Something wrong happended storing regional " +
×
539
                               "data to Geojson - area: {name}".format(name=str(region_name)))
540
                continue
×
541

542
            store_geojson(out_filepath, feature_collection)
4✔
543

544
            outmsg = self._generate_output_message(out_filepath, msg, regional_fmask[region_name])
4✔
545
            output_messages.append(outmsg)
4✔
546
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
547

548
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
549
                     "detections on this granule: %s" % str(regions_with_detections))
550
        return output_messages
4✔
551

552
    def fires_filtering(self, msg, af_shapeff):
4✔
553
        """Read Active Fire data and perform spatial filtering removing false detections.
554

555
        Do the national filtering first, and then filter out potential false
556
        detections by the special mask for that.
557

558
        """
559
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
560

561
        fmda = af_shapeff.metadata
4✔
562
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
563

564
        pout = Parser(self.outfile_pattern_national)
4✔
565
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
566
        logger.debug("Output file path = %s", out_filepath)
4✔
567

568
        # National filtering:
569
        af_shapeff.fires_filtering(self.shp_borders)
4✔
570

571
        # Metadata should be transfered here!
572
        afdata_ff = af_shapeff.get_af_data()
4✔
573

574
        if len(afdata_ff) > 0:
4!
575
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
576
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
577
            afdata_ff = af_shapeff.get_af_data()
4✔
578
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
579

580
        return afdata_ff
4✔
581

582
    def get_output_messages(self, filepath, msg, number_of_data):
4✔
583
        """Generate the adequate output message(s) depending on if an output file was created or not."""
NEW
584
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
×
NEW
585
        return [self._generate_output_message(filepath, msg)]
×
586

587
    def _generate_output_message(self, filepath, input_msg, region=None):
4✔
588
        """Create the output message to publish."""
589
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
590
        to_send = prepare_posttroll_message(input_msg, region)
4✔
591
        to_send['uri'] = ('ssh://%s/%s' % (self.host, filepath))
4✔
592
        to_send['uid'] = os.path.basename(filepath)
4✔
593
        to_send['type'] = 'GEOJSON-filtered'
4✔
594
        to_send['format'] = 'geojson'
4✔
595
        to_send['product'] = 'afimg'
4✔
596
        pubmsg = Message(output_topic, 'file', to_send)
4✔
597
        return pubmsg
4✔
598

599
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
600
        """Create the output messages to publish."""
601
        to_send = prepare_posttroll_message(input_msg)
4✔
602
        to_send['info'] = msg_string
4✔
603
        publish_messages = []
4✔
604
        for ext in ['National', 'Regional']:
4✔
605
            topic = self.output_topic + '/' + ext
4✔
606
            publish_messages.append(Message(topic, 'info', to_send))
4✔
607

608
        return publish_messages
4✔
609

610
    def _check_borders_shapes_exists(self):
4✔
611
        """Check that the national borders shapefile exists on disk."""
612
        if not os.path.exists(self.shp_borders):
4✔
613
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
614

615
    def _initialize_fire_detection_id(self):
4✔
616
        """Initialize the fire detection ID."""
617
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
618
            self._fire_detection_id = self.get_id_from_file()
4✔
619
        else:
620
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
621

622
    def update_fire_detection_id(self):
4✔
623
        """Update the fire detection ID registry."""
624
        now = datetime.utcnow()
4✔
625
        tdelta = now - self._fire_detection_id['date']
4✔
626
        if tdelta.total_seconds() > 24*3600:
4✔
627
            self._initialize_fire_detection_id()
4✔
628
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
629
            self._initialize_fire_detection_id()
4✔
630

631
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
632

633
    def save_id_to_file(self):
4✔
634
        """Save the (current) detection id on disk.
635

636
        It is assumed that the user permissions are so that a file can actually
637
        be written to disk here!
638
        """
639
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
640
            id_ = self._create_id_string()
4✔
641
            fpt.write(id_)
4✔
642

643
    def get_id_from_file(self):
4✔
644
        """Read the latest stored detection id string from disk and convert to internal format."""
645
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
646
            idstr = fpt.read()
4✔
647

648
        return self._get_id_from_string(idstr)
4✔
649

650
    def _get_id_from_string(self, idstr):
4✔
651
        """Get the detection id from string."""
652
        datestr, counter = idstr.split('-')
4✔
653
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
654
                'counter': int(counter)}
655

656
    def _create_id_string(self):
4✔
657
        """From the internal fire detection id create the id string to be exposed to the user."""
658
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
659
                '-' + str(self._fire_detection_id['counter']))
660

661
    def add_unique_day_id(self, afdata):
4✔
662
        """Add a unique detection id - date + a running number for the day."""
663
        # Add id's to the detections:
664
        id_list = []
4✔
665
        for _i in range(len(afdata)):
4✔
666
            self.update_fire_detection_id()
4✔
667
            id_ = self._create_id_string()
4✔
668
            id_list.append(id_)
4✔
669

670
        afdata['detection_id'] = id_list
4✔
671
        return afdata
4✔
672

673
    def close(self):
4✔
674
        """Shutdown the Active Fires postprocessing."""
675
        logger.info('Terminating Active Fires post processing.')
×
676
        self.loop = False
×
NEW
677
        logger.info('Dumping the latest detection id to disk: %s', str(self.filepath_detection_id_cache))
×
NEW
678
        self.save_id_to_file()
×
679
        try:
×
680
            self.listener.stop()
×
681
        except Exception:
×
682
            logger.exception("Couldn't stop listener.")
×
683
        if self.publisher:
×
684
            try:
×
685
                self.publisher.stop()
×
686
            except Exception:
×
687
                logger.exception("Couldn't stop publisher.")
×
688

689

690
def check_file_type_okay(file_type):
4✔
691
    """Check if the file is of the correct type."""
692
    if file_type not in ['txt', 'TXT']:
4✔
693
        logger.info('File type not txt: %s', str(file_type))
4✔
694
        return False
4✔
695
    return True
4✔
696

697

698
def get_filename_from_uri(uri):
4✔
699
    """Get the file name from the uri given."""
700
    logger.info('File uri: %s', str(uri))
4✔
701
    url = urlparse(uri)
4✔
702
    return url.path
4✔
703

704

705
def generate_posttroll_topic(output_topic, region=None):
4✔
706
    """Create the topic for the posttroll message to publish."""
707
    if region:
4✔
708
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
709
    else:
710
        output_topic = output_topic + '/National'
4✔
711

712
    return output_topic
4✔
713

714

715
def prepare_posttroll_message(input_msg, region=None):
4✔
716
    """Create the basic posttroll-message fields and return."""
717
    to_send = input_msg.data.copy()
4✔
718
    to_send.pop('dataset', None)
4✔
719
    to_send.pop('collection', None)
4✔
720
    to_send.pop('uri', None)
4✔
721
    to_send.pop('uid', None)
4✔
722
    to_send.pop('format', None)
4✔
723
    to_send.pop('type', None)
4✔
724
    # FIXME! Check that the region_name is stored as a unicode string!
725
    if region:
4✔
726
        to_send['region_name'] = region['attributes']['Testomr']
4✔
727
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
728

729
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc