• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 6085033300

05 Sep 2023 01:06PM UTC coverage: 83.035% (+0.7%) from 82.34%
6085033300

push

github

web-flow
Merge pull request #15 from adybbroe/add-unit-conversion-support

Add unit conversion support

263 of 411 branches covered (0.0%)

114 of 124 new or added lines in 6 files covered. (91.94%)

3 existing lines in 1 file now uncovered.

1997 of 2405 relevant lines covered (83.04%)

3.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.91
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbroe
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33
from geojson import Feature, Point, FeatureCollection, dump
4✔
34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.utils import datetime_utc2local
4✔
46
from activefires_pp.utils import UnitConverter
4✔
47
from activefires_pp.utils import get_local_timezone_offset
4✔
48
from activefires_pp.utils import json_serial
4✔
49
from activefires_pp.config import read_config
4✔
50
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
51

52
# M-band output:
53
# column 1: latitude of fire pixel (degrees)
54
# column 2: longitude of fire pixel (degrees)
55
# column 3: M13 brightness temperature of fire pixel (K)
56
# column 4: Along-scan fire pixel resolution (km)
57
# column 5: Along-track fire pixel resolution (km)
58
# column 6: detection confidence (%)
59
#           7=low, 8=nominal, 9=high (Imager Resolution)
60
#           0-100 % (Moderate Resolution)
61
# column 7: fire radiative power (MW)
62
# I-band output:
63
# column 1: latitude of fire pixel (degrees)
64
# column 2: longitude of fire pixel (degrees)
65
# column 3: I04 brightness temperature of fire pixel (K)
66
# column 4: Along-scan fire pixel resolution (km)
67
# column 5: Along-track fire pixel resolution (km)
68
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
69
# column 7: fire radiative power (MW)
70
#
71
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
72

73
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
74

75

76
logger = logging.getLogger(__name__)
4✔
77
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
78

79

80
class ActiveFiresShapefileFiltering(object):
4✔
81
    """Reading, filtering and writing Active Fire detections.
82

83
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
84
    Filtering for static and false alarms, and/or simply on geographical regions.
85
    Data is stored in geojson format.
86
    """
87

88
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
89
        """Initialize the ActiveFiresShapefileFiltering class."""
90
        self.input_filepath = filepath
4✔
91
        self._afdata = afdata
4✔
92
        if afdata is None:
4✔
93
            self.metadata = {}
4✔
94
        else:
95
            self.metadata = afdata.attrs
4✔
96

97
        self.timezone = timezone
4✔
98
        self.platform_name = platform_name
4✔
99

100
    def get_af_data(self, filepattern=None, localtime=True):
4✔
101
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
102
        if self._afdata is not None:
4✔
103
            # Make sure the attrs are populated with metadata instance attribute
104
            self._afdata.attrs.update(self.metadata)
4✔
105
            return self._afdata
4✔
106

107
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
108
            # FIXME! Better to raise an exception!?
109
            return self._afdata
×
110

111
        if not filepattern:
4!
112
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
113

114
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
115
        self._afdata = _read_data(self.input_filepath)
4✔
116
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
117

118
        return self._afdata
4✔
119

120
    def _get_metadata_from_filename(self, infile_pattern):
4✔
121
        """From the filename retrieve the metadata such as satellite and sensing time."""
122
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
123

124
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
125
        """Add start and end time to active fires data."""
126
        if localtime:
4✔
127
            logger.info("Convert to local time zone!")
4✔
128
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
129
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
130
        else:
131
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
132
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
133

134
        starttime = starttime.replace(tzinfo=None)
4✔
135
        endtime = endtime.replace(tzinfo=None)
4✔
136

137
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
138
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
139

140
        logger.info('Start and end times: %s %s',
4✔
141
                    str(self._afdata['starttime'][0]),
142
                    str(self._afdata['endtime'][0]))
143

144
    def _apply_timezone_offset(self, obstime):
4✔
145
        """Apply the time zone offset to the datetime objects."""
146
        obstime_offset = get_local_timezone_offset(self.timezone)
×
147
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
148
                         len(self._afdata)).astype(np.datetime64)
149

150
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
151
        """Remove fires outside National borders or filter out potential false detections.
152

153
        If *inside* is True the filtering will keep those detections that are inside the polygon.
154
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
155
        """
156
        detections = self._afdata
4✔
157

158
        lons = detections.longitude.values
4✔
159
        lats = detections.latitude.values
4✔
160

161
        toc = time.time()
4✔
162
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
163
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
164

165
        self._afdata = detections[points_inside == inside]
4✔
166

167
        if len(self._afdata) == 0:
4!
168
            logger.debug("No fires after filtering on Polygon...")
×
169
        else:
170
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
171

172
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
173
        """Get the regional filter masks from the shapefile."""
174
        detections = self._afdata
×
175

176
        lons = detections.longitude.values
×
177
        lats = detections.latitude.values
×
178

179
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
180
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
181
        shape_geom = ShapeGeometry(shapefile, globstr)
×
182
        shape_geom.load()
×
183

184
        p__ = pyproj.Proj(shape_geom.proj4str)
×
185
        metersx, metersy = p__(lons, lats)
×
186
        points = np.vstack([metersx, metersy]).T
×
187

188
        regional_masks = {}
×
189

190
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
191
            test_omr = attr['Testomr']
×
192
            all_inside_test_omr = False
×
193
            some_inside_test_omr = False
×
194
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
195

196
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
197

198
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
199
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
200
            else:
201
                shape = geometry
×
202
                pth = Path(shape.exterior.coords)
×
203
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
204

205
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
206
                all_inside_test_omr = True
×
207
                some_inside_test_omr = True
×
208
                logger.debug("All points inside test area!")
×
209
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
210
                some_inside_test_omr = True
×
211
                logger.debug("Some points inside test area!")
×
212

213
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
214
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
215

216
        return regional_masks
×
217

218

219
def _read_data(filepath):
4✔
220
    """Read the AF data."""
221
    with open(filepath, 'r') as fpt:
×
222
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
223

224

225
def get_metadata_from_filename(infile_pattern, filepath):
4✔
226
    """From the filename and its pattern get basic metadata of the satellite observations."""
227
    p__ = Parser(infile_pattern)
4✔
228
    fname = os.path.basename(filepath)
4✔
229
    try:
4✔
230
        res = p__.parse(fname)
4✔
231
    except ValueError:
×
232
        # Do something!
233
        return None
×
234

235
    # Fix the end time:
236
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
237
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
238
                       res['end_hour'].second)
239
    if endtime < res['start_time']:
4!
240
        endtime = endtime + timedelta(days=1)
×
241

242
    res['end_time'] = endtime
4✔
243

244
    return res
4✔
245

246

247
def store(output_filename, detections):
4✔
248
    """Store the filtered AF detections on disk."""
249
    if len(detections) > 0:
×
250
        detections.to_csv(output_filename, index=False)
×
251
        return output_filename
×
252
    else:
253
        logger.debug("No detections to save!")
×
254
        return None
×
255

256

257
def geojson_feature_collection_from_detections(detections, platform_name=None):
4✔
258
    """Create the Geojson feature collection from fire detection data."""
259
    if len(detections) == 0:
4!
NEW
260
        raise ValueError("No detections to save!")
×
261

262
    # Convert points to GeoJSON
263
    features = []
4✔
264
    for idx in range(len(detections)):
4✔
265
        starttime = detections.iloc[idx].starttime
4✔
266
        endtime = detections.iloc[idx].endtime
4✔
267
        mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
4✔
268
                                                         starttime.to_pydatetime()) / 2.
269

270
        prop = {'power': detections.iloc[idx].power,
4✔
271
                'tb': detections.iloc[idx].tb,
272
                'confidence': int(detections.iloc[idx].conf),
273
                'observation_time': json_serial(mean_granule_time)
274
                }
275

276
        try:
4✔
277
            prop['tb_celcius'] = detections.iloc[idx].tb_celcius
4✔
278
        except AttributeError:
4✔
279
            logger.debug("Failed adding the TB in celcius!")
4✔
280
            pass
4✔
281
        try:
4✔
282
            prop['id'] = detections.iloc[idx].detection_id
4✔
283
        except AttributeError:
4✔
284
            logger.debug("Failed adding the unique detection id!")
4✔
285
            pass
4✔
286

287
        if platform_name:
4!
288
            prop['platform_name'] = platform_name
4✔
289
        else:
290
            logger.debug("No platform name specified for output")
×
291

292
        feat = Feature(
4✔
293
            geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
294
            properties=prop)
295
        features.append(feat)
4✔
296

297
    return FeatureCollection(features)
4✔
298

299

300
def store_geojson(output_filename, feature_collection):
4✔
301
    """Store the Geojson feature collection of fire detections on disk."""
302
    path = os.path.dirname(output_filename)
4✔
303
    if not os.path.exists(path):
4!
304
        logger.info("Create directory: %s", path)
×
305
        os.makedirs(path)
×
306

307
    with open(output_filename, 'w') as fpt:
4✔
308
        dump(feature_collection, fpt)
4✔
309

310

311
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
312
    """Get mask for points from a shapely Multipolygon."""
313
    shape = geometry.geoms[0]
4✔
314
    pth = Path(shape.exterior.coords)
4✔
315
    mask = pth.contains_points(points)
4✔
316

317
    if sum(mask) == len(points):
4!
318
        return mask
×
319

320
    constituent_part = geometry.geoms[start_idx:]
4✔
321
    for shape in constituent_part.geoms:
4✔
322
        pth = Path(shape.exterior.coords)
4✔
323
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
324
        if sum(mask) == len(points):
4✔
325
            break
4✔
326

327
    return mask
4✔
328

329

330
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
331
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
332
    lons, lats = lonlats
4✔
333
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
334
    shape_geom = ShapeGeometry(shapefile)
4✔
335
    shape_geom.load()
4✔
336

337
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
338

339
    # There is only one geometry/multi-polygon!
340
    geometry = shape_geom.geometries[0]
4✔
341

342
    metersx, metersy = p__(lons, lats)
4✔
343
    points = np.vstack([metersx, metersy]).T
4✔
344

345
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
346

347

348
class ActiveFiresPostprocessing(Thread):
4✔
349
    """The active fires post processor."""
350

351
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
352
        """Initialize the active fires post processor class."""
353
        super().__init__()
4✔
354
        self.shp_borders = shp_borders
4✔
355
        self.shp_filtermask = shp_mask
4✔
356

357
        self.regional_filtermask = regional_filtermask
4✔
358
        self.configfile = configfile
4✔
359
        self.options = {}
4✔
360

361
        config = read_config(self.configfile)
4✔
362
        self._set_options_from_config(config)
4✔
363

364
        self.host = socket.gethostname()
4✔
365
        self.timezone = self.options.get('timezone', 'GMT')
4✔
366

367
        self.input_topic = self.options['subscribe_topics'][0]
4✔
368
        self.output_topic = self.options['publish_topic']
4✔
369
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
370

371
        self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
4✔
372
        self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')
4✔
373

374
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
375
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
376

377
        frmt = self.options['regional_shapefiles_format']
4✔
378
        self.regional_shapefiles_globstr = globify(frmt)
4✔
379

380
        self._fire_detection_id = None
4✔
381
        self._initialize_fire_detection_id()
4✔
382

383
        self.listener = None
4✔
384
        self.publisher = None
4✔
385
        self.loop = False
4✔
386
        self._setup_and_start_communication()
4✔
387

388
    def _setup_and_start_communication(self):
4✔
389
        """Set up the Posttroll communication and start the publisher."""
390
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
391
        now = datetime_utc2local(datetime.now(), self.timezone)
×
392
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
393

NEW
394
        tic = time.time()
×
NEW
395
        units = {'temperature': 'degC'}
×
NEW
396
        self.unit_converter = UnitConverter(units)
×
NEW
397
        logger.debug("Unit conversion initialization with Pint took %f seconds.", time.time() - tic)
×
398

UNCOV
399
        self._check_borders_shapes_exists()
×
400

401
        self.listener = ListenerContainer(topics=[self.input_topic])
×
402
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
403
        self.publisher.start()
×
404
        self.loop = True
×
405
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
406

407
    def _set_options_from_config(self, config):
4✔
408
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
409
        for item in config:
4✔
410
            if not isinstance(config[item], dict):
4!
411
                self.options[item] = config[item]
4✔
412

413
        if isinstance(self.options.get('subscribe_topics'), str):
4!
414
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
415
            for item in subscribe_topics:
4✔
416
                if len(item) == 0:
4!
417
                    subscribe_topics.remove(item)
×
418
            self.options['subscribe_topics'] = subscribe_topics
4✔
419

420
        if isinstance(self.options.get('publish_topics'), str):
4!
421
            publish_topics = self.options.get('publish_topics').split(',')
×
422
            for item in publish_topics:
×
423
                if len(item) == 0:
×
424
                    publish_topics.remove(item)
×
425
            self.options['publish_topics'] = publish_topics
×
426

427
    def signal_shutdown(self, *args, **kwargs):
4✔
428
        """Shutdown the Active Fires postprocessing."""
429
        self.close()
×
430

431
    def check_incoming_message_and_get_filename(self, msg):
4✔
432
        """Check the message content and return filename if okay."""
433
        if msg.type not in ['file', 'collection', 'dataset']:
4!
434
            logger.debug("Message type not supported: %s", str(msg.type))
×
435
            return None
×
436

437
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
438
        if not os.path.exists(filename):
4✔
439
            logger.warning("File does not exist: %s", filename)
4✔
440
            return None
4✔
441

442
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
443
        if not file_ok:
4✔
444
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
445
            for output_msg in output_messages:
4✔
446
                logger.debug("Sending message: %s", str(output_msg))
4✔
447
                self.publisher.send(str(output_msg))
4✔
448
            return None
4✔
449

450
        return filename
4✔
451

452
    def do_postprocessing_on_message(self, msg, filename):
4✔
453
        """Do the fires post processing on a message."""
454
        platform_name = msg.data.get('platform_name')
×
455
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
456
                                                   timezone=self.timezone)
457
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
458
        if len(afdata) == 0:
×
459
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
460
            for output_msg in output_messages:
×
461
                logger.debug("Sending message: %s", str(output_msg))
×
462
                self.publisher.send(str(output_msg))
×
463
            return
×
464

465
        afdata = self.fires_filtering(msg, af_shapeff)
×
466
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
467
        if len(afdata) == 0:
×
468
            logger.debug("No fires - so no regional filtering to be done!")
×
469
            return
×
470

UNCOV
471
        afdata = self.add_unique_day_id(afdata)
×
472
        self.save_id_to_file()
×
473

NEW
474
        afdata = self.add_tb_celcius(afdata)
×
475

476
        fmda = af_shapeff.metadata
×
477
        pout = Parser(self.outfile_pattern_national)
×
478
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
×
479
        logger.debug("Output file path = %s", out_filepath)
×
480

481
        # 1) Create geojson feature collection
482
        # 2) Dump geojson data to disk
NEW
483
        try:
×
NEW
484
            feature_collection = geojson_feature_collection_from_detections(afdata,
×
485
                                                                            platform_name=af_shapeff.platform_name)
NEW
486
        except ValueError:
×
487
            logger.info("No geojson file created, number of fires after filtering = %d", len(afdata))
×
488
            output_messages = self._generate_no_fires_messages(msg,
×
489
                                                               'No true fire detections inside National borders')  # noqa
490
        else:
491
            store_geojson(out_filepath, feature_collection)
×
492
            output_messages = self.get_output_messages(out_filepath, msg, len(afdata))
×
493

494
        for output_msg in output_messages:
×
495
            if output_msg:
×
496
                logger.debug("Sending message: %s", str(output_msg))
×
497
                self.publisher.send(str(output_msg))
×
498

499
        # Do the regional filtering now:
500
        if not self.regional_filtermask:
×
501
            logger.info("No regional filtering is attempted.")
×
502
            return
×
503

504
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
505
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
506
                                                   timezone=self.timezone)
507
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
508
                                                             globstr=self.regional_shapefiles_globstr)
509
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
510
        for region_msg in regional_messages:
×
511
            logger.debug("Sending message: %s", str(region_msg))
×
512
            self.publisher.send(str(region_msg))
×
513

514
    def run(self):
4✔
515
        """Run the AF post processing."""
516
        while self.loop:
×
517
            try:
×
518
                msg = self.listener.output_queue.get(timeout=1)
×
519
                logger.debug("Message: %s", str(msg.data))
×
520
            except Empty:
×
521
                continue
×
522
            else:
523
                filename = self.check_incoming_message_and_get_filename(msg)
×
524
                if not filename:
×
525
                    continue
×
526

527
                self.do_postprocessing_on_message(msg, filename)
×
528

529
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
530
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
531
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
532

533
        afdata = afsff_obj.get_af_data()
4✔
534
        fmda = afsff_obj.metadata
4✔
535

536
        fmda['platform'] = afsff_obj.platform_name
4✔
537

538
        pout = Parser(self.outfile_pattern_regional)
4✔
539

540
        output_messages = []
4✔
541
        regions_with_detections = 0
4✔
542
        for region_name in regional_fmask:
4✔
543
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
544
                continue
4✔
545

546
            regions_with_detections = regions_with_detections + 1
4✔
547
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
548

549
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
550
            logger.debug("Output file path = %s", out_filepath)
4✔
551
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
552

553
            try:
4✔
554
                feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
555
                                                                                platform_name=fmda['platform'])
NEW
556
            except ValueError:
×
UNCOV
557
                logger.warning("Something wrong happended storing regional " +
×
558
                               "data to Geojson - area: {name}".format(name=str(region_name)))
559
                continue
×
560

561
            store_geojson(out_filepath, feature_collection)
4✔
562

563
            outmsg = self._generate_output_message(out_filepath, msg, regional_fmask[region_name])
4✔
564
            output_messages.append(outmsg)
4✔
565
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
566

567
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
568
                     "detections on this granule: %s" % str(regions_with_detections))
569
        return output_messages
4✔
570

571
    def fires_filtering(self, msg, af_shapeff):
4✔
572
        """Read Active Fire data and perform spatial filtering removing false detections.
573

574
        Do the national filtering first, and then filter out potential false
575
        detections by the special mask for that.
576

577
        """
578
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
579

580
        fmda = af_shapeff.metadata
4✔
581
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
582

583
        pout = Parser(self.outfile_pattern_national)
4✔
584
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
585
        logger.debug("Output file path = %s", out_filepath)
4✔
586

587
        # National filtering:
588
        af_shapeff.fires_filtering(self.shp_borders)
4✔
589

590
        # Metadata should be transfered here!
591
        afdata_ff = af_shapeff.get_af_data()
4✔
592

593
        if len(afdata_ff) > 0:
4!
594
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
595
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
596
            afdata_ff = af_shapeff.get_af_data()
4✔
597
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
598

599
        return afdata_ff
4✔
600

601
    def get_output_messages(self, filepath, msg, number_of_data):
4✔
602
        """Generate the adequate output message(s) depending on if an output file was created or not."""
603
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
×
604
        return [self._generate_output_message(filepath, msg)]
×
605

606
    def _generate_output_message(self, filepath, input_msg, region=None):
4✔
607
        """Create the output message to publish."""
608
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
609
        to_send = prepare_posttroll_message(input_msg, region)
4✔
610
        to_send['uri'] = str(filepath)
4✔
611
        to_send['uid'] = os.path.basename(filepath)
4✔
612
        to_send['type'] = 'GEOJSON-filtered'
4✔
613
        to_send['format'] = 'geojson'
4✔
614
        to_send['product'] = 'afimg'
4✔
615
        pubmsg = Message(output_topic, 'file', to_send)
4✔
616
        return pubmsg
4✔
617

618
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
619
        """Create the output messages to publish."""
620
        to_send = prepare_posttroll_message(input_msg)
4✔
621
        to_send['info'] = msg_string
4✔
622
        publish_messages = []
4✔
623
        for ext in ['National', 'Regional']:
4✔
624
            topic = self.output_topic + '/' + ext
4✔
625
            publish_messages.append(Message(topic, 'info', to_send))
4✔
626

627
        return publish_messages
4✔
628

629
    def _check_borders_shapes_exists(self):
4✔
630
        """Check that the national borders shapefile exists on disk."""
631
        if not os.path.exists(self.shp_borders):
4✔
632
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
633

634
    def _initialize_fire_detection_id(self):
4✔
635
        """Initialize the fire detection ID."""
636
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
637
            self._fire_detection_id = self.get_id_from_file()
4✔
638
        else:
639
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
640

641
    def update_fire_detection_id(self):
4✔
642
        """Update the fire detection ID registry."""
643
        now = datetime.utcnow()
4✔
644
        tdelta = now - self._fire_detection_id['date']
4✔
645
        if tdelta.total_seconds() > 24*3600:
4✔
646
            self._initialize_fire_detection_id()
4✔
647
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
648
            self._initialize_fire_detection_id()
4✔
649

650
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
651

652
    def save_id_to_file(self):
4✔
653
        """Save the (current) detection id on disk.
654

655
        It is assumed that the user permissions are so that a file can actually
656
        be written to disk here!
657
        """
658
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
659
            id_ = self._create_id_string()
4✔
660
            fpt.write(id_)
4✔
661

662
    def get_id_from_file(self):
4✔
663
        """Read the latest stored detection id string from disk and convert to internal format."""
664
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
665
            idstr = fpt.read()
4✔
666

667
        return self._get_id_from_string(idstr)
4✔
668

669
    def _get_id_from_string(self, idstr):
4✔
670
        """Get the detection id from string."""
671
        datestr, counter = idstr.split('-')
4✔
672
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
673
                'counter': int(counter)}
674

675
    def _create_id_string(self):
4✔
676
        """From the internal fire detection id create the id string to be exposed to the user."""
677
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
678
                '-' + str(self._fire_detection_id['counter']))
679

680
    def add_unique_day_id(self, afdata):
4✔
681
        """Add a unique detection id - date + a running number for the day."""
682
        # Add id's to the detections:
683
        id_list = []
4✔
684
        for _i in range(len(afdata)):
4✔
685
            self.update_fire_detection_id()
4✔
686
            id_ = self._create_id_string()
4✔
687
            id_list.append(id_)
4✔
688

689
        afdata['detection_id'] = id_list
4✔
690
        return afdata
4✔
691

692
    def add_tb_celcius(self, data_frame):
4✔
693
        """Add a column with TB in Celcius to the fire detection data frame."""
694
        def kelvin2celcius(x):
4✔
695
            tb_c = self.unit_converter.convert('temperature', x)
4✔
696
            return tb_c.magnitude
4✔
697

698
        data_frame['tb_celcius'] = data_frame['tb'].apply(kelvin2celcius)
4✔
699
        return data_frame
4✔
700

701
    def close(self):
4✔
702
        """Shutdown the Active Fires postprocessing."""
703
        logger.info('Terminating Active Fires post processing.')
×
704
        self.loop = False
×
705
        logger.info('Dumping the latest detection id to disk: %s', str(self.filepath_detection_id_cache))
×
706
        self.save_id_to_file()
×
707
        try:
×
708
            self.listener.stop()
×
709
        except Exception:
×
710
            logger.exception("Couldn't stop listener.")
×
711
        if self.publisher:
×
712
            try:
×
713
                self.publisher.stop()
×
714
            except Exception:
×
715
                logger.exception("Couldn't stop publisher.")
×
716

717

718
def check_file_type_okay(file_type):
4✔
719
    """Check if the file is of the correct type."""
720
    if file_type not in ['txt', 'TXT']:
4✔
721
        logger.info('File type not txt: %s', str(file_type))
4✔
722
        return False
4✔
723
    return True
4✔
724

725

726
def get_filename_from_uri(uri):
4✔
727
    """Get the file name from the uri given."""
728
    logger.info('File uri: %s', str(uri))
4✔
729
    url = urlparse(uri)
4✔
730
    return url.path
4✔
731

732

733
def generate_posttroll_topic(output_topic, region=None):
4✔
734
    """Create the topic for the posttroll message to publish."""
735
    if region:
4✔
736
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
737
    else:
738
        output_topic = output_topic + '/National'
4✔
739

740
    return output_topic
4✔
741

742

743
def prepare_posttroll_message(input_msg, region=None):
4✔
744
    """Create the basic posttroll-message fields and return."""
745
    to_send = input_msg.data.copy()
4✔
746
    to_send.pop('dataset', None)
4✔
747
    to_send.pop('collection', None)
4✔
748
    to_send.pop('uri', None)
4✔
749
    to_send.pop('uid', None)
4✔
750
    to_send.pop('format', None)
4✔
751
    to_send.pop('type', None)
4✔
752
    # FIXME! Check that the region_name is stored as a unicode string!
753
    if region:
4✔
754
        to_send['region_name'] = region['attributes']['Testomr']
4✔
755
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
756

757
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc