• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adybbroe / activefires-pp / 5585325056

pending completion
5585325056

Pull #15

github

web-flow
Merge 60084eada into 33a20c265
Pull Request #15: Add unit conversion support

266 of 417 branches covered (63.79%)

115 of 120 new or added lines in 6 files covered. (95.83%)

2 existing lines in 1 file now uncovered.

2000 of 2405 relevant lines covered (83.16%)

3.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.72
/activefires_pp/post_processing.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2021 - 2023 Adam.Dybbroe
5

6
# Author(s):
7

8
#   Adam Dybbroe <Firstname.Lastname@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Post processing on the Active Fire detections."""
4✔
24

25
import socket
4✔
26
from trollsift import Parser, globify
4✔
27
import time
4✔
28
import pandas as pd
4✔
29
from datetime import datetime, timedelta
4✔
30
import numpy as np
4✔
31
import os
4✔
32
from six.moves.urllib.parse import urlparse
4✔
33
from geojson import Feature, Point, FeatureCollection, dump
4✔
34
import logging
4✔
35
import signal
4✔
36
from queue import Empty
4✔
37
from threading import Thread
4✔
38
from posttroll.listener import ListenerContainer
4✔
39
from posttroll.message import Message
4✔
40
from posttroll.publisher import NoisyPublisher
4✔
41
import pyproj
4✔
42
from matplotlib.path import Path
4✔
43
import shapely
4✔
44

45
from activefires_pp.utils import datetime_utc2local
4✔
46
from activefires_pp.utils import UnitConverter
4✔
47
from activefires_pp.utils import get_local_timezone_offset
4✔
48
from activefires_pp.utils import json_serial
4✔
49
from activefires_pp.config import read_config
4✔
50
from activefires_pp.geometries_from_shapefiles import ShapeGeometry
4✔
51

52
# M-band output:
53
# column 1: latitude of fire pixel (degrees)
54
# column 2: longitude of fire pixel (degrees)
55
# column 3: M13 brightness temperature of fire pixel (K)
56
# column 4: Along-scan fire pixel resolution (km)
57
# column 5: Along-track fire pixel resolution (km)
58
# column 6: detection confidence (%)
59
#           7=low, 8=nominal, 9=high (Imager Resolution)
60
#           0-100 % (Moderate Resolution)
61
# column 7: fire radiative power (MW)
62
# I-band output:
63
# column 1: latitude of fire pixel (degrees)
64
# column 2: longitude of fire pixel (degrees)
65
# column 3: I04 brightness temperature of fire pixel (K)
66
# column 4: Along-scan fire pixel resolution (km)
67
# column 5: Along-track fire pixel resolution (km)
68
# column 6: detection confidence ([7,8,9]->[lo,med,hi])
69
# column 7: fire radiative power (MW)
70
#
71
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]
4✔
72

73
NO_FIRES_TEXT = 'No fire detections for this granule'
4✔
74

75

76
logger = logging.getLogger(__name__)
4✔
77
logging.getLogger("fiona").setLevel(logging.WARNING)
4✔
78

79

80
class ActiveFiresShapefileFiltering(object):
4✔
81
    """Reading, filtering and writing Active Fire detections.
82

83
    Reading either the CSPP VIIRS AF output (txt files) or the Geojson formatted files.
84
    Filtering for static and false alarms, and/or simply on geographical regions.
85
    Data is stored in geojson format.
86
    """
87

88
    def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
4✔
89
        """Initialize the ActiveFiresShapefileFiltering class."""
90
        self.input_filepath = filepath
4✔
91
        self._afdata = afdata
4✔
92
        if afdata is None:
4✔
93
            self.metadata = {}
4✔
94
        else:
95
            self.metadata = afdata.attrs
4✔
96

97
        self.timezone = timezone
4✔
98
        self.platform_name = platform_name
4✔
99

100
    def get_af_data(self, filepattern=None, localtime=True):
4✔
101
        """Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
102
        if self._afdata is not None:
4✔
103
            # Make sure the attrs are populated with metadata instance attribute
104
            self._afdata.attrs.update(self.metadata)
4✔
105
            return self._afdata
4✔
106

107
        if not self.input_filepath or not os.path.exists(self.input_filepath):
4!
108
            # FIXME! Better to raise an exception!?
109
            return self._afdata
×
110

111
        if not filepattern:
4!
112
            raise AttributeError("file pattern must be provided in order to be able to read from file!")
×
113

114
        self.metadata = self._get_metadata_from_filename(filepattern)
4✔
115
        self._afdata = _read_data(self.input_filepath)
4✔
116
        self._add_start_and_end_time_to_active_fires_data(localtime)
4✔
117

118
        return self._afdata
4✔
119

120
    def _get_metadata_from_filename(self, infile_pattern):
4✔
121
        """From the filename retrieve the metadata such as satellite and sensing time."""
122
        return get_metadata_from_filename(infile_pattern, self.input_filepath)
4✔
123

124
    def _add_start_and_end_time_to_active_fires_data(self, localtime):
4✔
125
        """Add start and end time to active fires data."""
126
        if localtime:
4✔
127
            logger.info("Convert to local time zone!")
4✔
128
            starttime = datetime_utc2local(self.metadata['start_time'], self.timezone)
4✔
129
            endtime = datetime_utc2local(self.metadata['end_time'], self.timezone)
4✔
130
        else:
131
            starttime = datetime_utc2local(self.metadata['start_time'], 'GMT')
4✔
132
            endtime = datetime_utc2local(self.metadata['end_time'], 'GMT')
4✔
133

134
        starttime = starttime.replace(tzinfo=None)
4✔
135
        endtime = endtime.replace(tzinfo=None)
4✔
136

137
        self._afdata['starttime'] = np.repeat(starttime, len(self._afdata)).astype(np.datetime64)
4✔
138
        self._afdata['endtime'] = np.repeat(endtime, len(self._afdata)).astype(np.datetime64)
4✔
139

140
        logger.info('Start and end times: %s %s',
4✔
141
                    str(self._afdata['starttime'][0]),
142
                    str(self._afdata['endtime'][0]))
143

144
    def _apply_timezone_offset(self, obstime):
4✔
145
        """Apply the time zone offset to the datetime objects."""
146
        obstime_offset = get_local_timezone_offset(self.timezone)
×
147
        return np.repeat(obstime.replace(tzinfo=None) + obstime_offset,
×
148
                         len(self._afdata)).astype(np.datetime64)
149

150
    def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
4✔
151
        """Remove fires outside National borders or filter out potential false detections.
152

153
        If *inside* is True the filtering will keep those detections that are inside the polygon.
154
        If *inside* is False the filtering will disregard the detections that are inside the polygon.
155
        """
156
        detections = self._afdata
4✔
157

158
        lons = detections.longitude.values
4✔
159
        lats = detections.latitude.values
4✔
160

161
        toc = time.time()
4✔
162
        points_inside = get_global_mask_from_shapefile(shapefile, (lons, lats), start_geometries_index)
4✔
163
        logger.debug("Time used checking inside polygon - mpl path method: %f", time.time() - toc)
4✔
164

165
        self._afdata = detections[points_inside == inside]
4✔
166

167
        if len(self._afdata) == 0:
4!
168
            logger.debug("No fires after filtering on Polygon...")
×
169
        else:
170
            logger.debug("Number of detections after filtering on Polygon: %d", len(self._afdata))
4✔
171

172
    def get_regional_filtermasks(self, shapefile, globstr):
4✔
173
        """Get the regional filter masks from the shapefile."""
174
        detections = self._afdata
×
175

176
        lons = detections.longitude.values
×
177
        lats = detections.latitude.values
×
178

179
        logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
×
180
        logger.debug("Shape file glob-string = %s" % str(globstr))
×
181
        shape_geom = ShapeGeometry(shapefile, globstr)
×
182
        shape_geom.load()
×
183

184
        p__ = pyproj.Proj(shape_geom.proj4str)
×
185
        metersx, metersy = p__(lons, lats)
×
186
        points = np.vstack([metersx, metersy]).T
×
187

188
        regional_masks = {}
×
189

190
        for attr, geometry in zip(shape_geom.attributes, shape_geom.geometries):
×
191
            test_omr = attr['Testomr']
×
192
            all_inside_test_omr = False
×
193
            some_inside_test_omr = False
×
194
            logger.debug(u'Test area: {}'.format(str(test_omr)))
×
195

196
            regional_masks[test_omr] = {'mask': None, 'attributes': attr}
×
197

198
            if isinstance(geometry, shapely.geometry.multipolygon.MultiPolygon):
×
199
                regional_masks[test_omr]['mask'] = get_mask_from_multipolygon(points, geometry)
×
200
            else:
201
                shape = geometry
×
202
                pth = Path(shape.exterior.coords)
×
203
                regional_masks[test_omr]['mask'] = pth.contains_points(points)
×
204

205
            if sum(regional_masks[test_omr]['mask']) == len(points):
×
206
                all_inside_test_omr = True
×
207
                some_inside_test_omr = True
×
208
                logger.debug("All points inside test area!")
×
209
            elif sum(regional_masks[test_omr]['mask']) > 0:
×
210
                some_inside_test_omr = True
×
211
                logger.debug("Some points inside test area!")
×
212

213
            regional_masks[test_omr]['all_inside_test_area'] = all_inside_test_omr
×
214
            regional_masks[test_omr]['some_inside_test_area'] = some_inside_test_omr
×
215

216
        return regional_masks
×
217

218

219
def _read_data(filepath):
4✔
220
    """Read the AF data."""
221
    with open(filepath, 'r') as fpt:
×
222
        return pd.read_csv(fpt, index_col=None, header=None, comment='#', names=COL_NAMES)
×
223

224

225
def get_metadata_from_filename(infile_pattern, filepath):
4✔
226
    """From the filename and its pattern get basic metadata of the satellite observations."""
227
    p__ = Parser(infile_pattern)
4✔
228
    fname = os.path.basename(filepath)
4✔
229
    try:
4✔
230
        res = p__.parse(fname)
4✔
231
    except ValueError:
×
232
        # Do something!
233
        return None
×
234

235
    # Fix the end time:
236
    endtime = datetime(res['start_time'].year, res['start_time'].month,
4✔
237
                       res['start_time'].day, res['end_hour'].hour, res['end_hour'].minute,
238
                       res['end_hour'].second)
239
    if endtime < res['start_time']:
4!
240
        endtime = endtime + timedelta(days=1)
×
241

242
    res['end_time'] = endtime
4✔
243

244
    return res
4✔
245

246

247
def store(output_filename, detections):
4✔
248
    """Store the filtered AF detections on disk."""
249
    if len(detections) > 0:
×
250
        detections.to_csv(output_filename, index=False)
×
251
        return output_filename
×
252
    else:
253
        logger.debug("No detections to save!")
×
254
        return None
×
255

256

257
def geojson_feature_collection_from_detections(detections, platform_name=None):
4✔
258
    """Create the Geojson feature collection from fire detection data."""
259
    if len(detections) == 0:
4!
260
        logger.debug("No detections to save!")
×
261
        return None
×
262

263
    # Convert points to GeoJSON
264
    features = []
4✔
265
    for idx in range(len(detections)):
4✔
266
        starttime = detections.iloc[idx].starttime
4✔
267
        endtime = detections.iloc[idx].endtime
4✔
268
        mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
4✔
269
                                                         starttime.to_pydatetime()) / 2.
270

271
        prop = {'power': detections.iloc[idx].power,
4✔
272
                'tb': detections.iloc[idx].tb,
273
                'confidence': int(detections.iloc[idx].conf),
274
                'observation_time': json_serial(mean_granule_time)
275
                }
276

277
        try:
4✔
278
            prop['tb_celcius'] = detections.iloc[idx].tb_celcius
4✔
279
        except AttributeError:
4✔
280
            logger.debug("Failed adding the TB in celcius!")
4✔
281
            pass
4✔
282
        try:
4✔
283
            prop['id'] = detections.iloc[idx].detection_id
4✔
284
        except AttributeError:
4✔
285
            logger.debug("Failed adding the unique detection id!")
4✔
286
            pass
4✔
287

288
        if platform_name:
4!
289
            prop['platform_name'] = platform_name
4✔
290
        else:
291
            logger.debug("No platform name specified for output")
×
292

293
        feat = Feature(
4✔
294
            geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
295
            properties=prop)
296
        features.append(feat)
4✔
297

298
    return FeatureCollection(features)
4✔
299

300

301
def store_geojson(output_filename, feature_collection):
4✔
302
    """Store the Geojson feature collection of fire detections on disk."""
303
    path = os.path.dirname(output_filename)
4✔
304
    if not os.path.exists(path):
4!
305
        logger.info("Create directory: %s", path)
×
306
        os.makedirs(path)
×
307

308
    with open(output_filename, 'w') as fpt:
4✔
309
        dump(feature_collection, fpt)
4✔
310

311

312
def get_mask_from_multipolygon(points, geometry, start_idx=1):
4✔
313
    """Get mask for points from a shapely Multipolygon."""
314
    shape = geometry.geoms[0]
4✔
315
    pth = Path(shape.exterior.coords)
4✔
316
    mask = pth.contains_points(points)
4✔
317

318
    if sum(mask) == len(points):
4!
319
        return mask
×
320

321
    constituent_part = geometry.geoms[start_idx:]
4✔
322
    for shape in constituent_part.geoms:
4✔
323
        pth = Path(shape.exterior.coords)
4✔
324
        mask = np.logical_or(mask, pth.contains_points(points))
4✔
325
        if sum(mask) == len(points):
4✔
326
            break
4✔
327

328
    return mask
4✔
329

330

331
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
4✔
332
    """Given geographical (lon,lat) points get a mask to apply when filtering."""
333
    lons, lats = lonlats
4✔
334
    logger.debug("Getting the global mask from file: shapefile file path = %s", str(shapefile))
4✔
335
    shape_geom = ShapeGeometry(shapefile)
4✔
336
    shape_geom.load()
4✔
337

338
    p__ = pyproj.Proj(shape_geom.proj4str)
4✔
339

340
    # There is only one geometry/multi-polygon!
341
    geometry = shape_geom.geometries[0]
4✔
342

343
    metersx, metersy = p__(lons, lats)
4✔
344
    points = np.vstack([metersx, metersy]).T
4✔
345

346
    return get_mask_from_multipolygon(points, geometry, start_geom_index)
4✔
347

348

349
class ActiveFiresPostprocessing(Thread):
4✔
350
    """The active fires post processor."""
351

352
    def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
4✔
353
        """Initialize the active fires post processor class."""
354
        super().__init__()
4✔
355
        self.shp_borders = shp_borders
4✔
356
        self.shp_filtermask = shp_mask
4✔
357

358
        self.regional_filtermask = regional_filtermask
4✔
359
        self.configfile = configfile
4✔
360
        self.options = {}
4✔
361

362
        config = read_config(self.configfile)
4✔
363
        self._set_options_from_config(config)
4✔
364

365
        self.host = socket.gethostname()
4✔
366
        self.timezone = self.options.get('timezone', 'GMT')
4✔
367

368
        self.input_topic = self.options['subscribe_topics'][0]
4✔
369
        self.output_topic = self.options['publish_topic']
4✔
370
        self.infile_pattern = self.options.get('af_pattern_ibands')
4✔
371

372
        self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
4✔
373
        self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')
4✔
374

375
        # self.regional_outputs = self.options.get('geojson-regional')
376
        # self.national_outputs = self.options.get('geojson-national')
377
        # self.set_output_filename_parsers()
378

379
        self.output_dir = self.options.get('output_dir', '/tmp')
4✔
380
        self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
4✔
381

382
        frmt = self.options['regional_shapefiles_format']
4✔
383
        self.regional_shapefiles_globstr = globify(frmt)
4✔
384

385
        self._fire_detection_id = None
4✔
386
        self._initialize_fire_detection_id()
4✔
387

388
        self.listener = None
4✔
389
        self.publisher = None
4✔
390
        self.loop = False
4✔
391
        self._setup_and_start_communication()
4✔
392

393
    def _setup_and_start_communication(self):
4✔
394
        """Set up the Posttroll communication and start the publisher."""
395
        logger.debug("Starting up... Input topic: %s", self.input_topic)
×
396
        now = datetime_utc2local(datetime.now(), self.timezone)
×
397
        logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))
×
398

NEW
399
        tic = time.time()
×
NEW
400
        units = {'temperature': 'degC'}
×
NEW
401
        self.unit_converter = UnitConverter(units)
×
NEW
402
        logger.debug("Unit conversion initialization with Pint took %f seconds.", tic - time.time())
×
403

UNCOV
404
        self._check_borders_shapes_exists()
×
405

406
        self.listener = ListenerContainer(topics=[self.input_topic])
×
407
        self.publisher = NoisyPublisher("active_fires_postprocessing")
×
408
        self.publisher.start()
×
409
        self.loop = True
×
410
        signal.signal(signal.SIGTERM, self.signal_shutdown)
×
411

412
    def _set_options_from_config(self, config):
4✔
413
        """From the configuration on disk set the option dictionary, holding all metadata for processing."""
414
        for item in config:
4✔
415
            if not isinstance(config[item], dict):
4!
416
                self.options[item] = config[item]
4✔
417

418
        if isinstance(self.options.get('subscribe_topics'), str):
4!
419
            subscribe_topics = self.options.get('subscribe_topics').split(',')
4✔
420
            for item in subscribe_topics:
4✔
421
                if len(item) == 0:
4!
422
                    subscribe_topics.remove(item)
×
423
            self.options['subscribe_topics'] = subscribe_topics
4✔
424

425
        if isinstance(self.options.get('publish_topics'), str):
4!
426
            publish_topics = self.options.get('publish_topics').split(',')
×
427
            for item in publish_topics:
×
428
                if len(item) == 0:
×
429
                    publish_topics.remove(item)
×
430
            self.options['publish_topics'] = publish_topics
×
431

432
    def signal_shutdown(self, *args, **kwargs):
4✔
433
        """Shutdown the Active Fires postprocessing."""
434
        self.close()
×
435

436
    def check_incoming_message_and_get_filename(self, msg):
4✔
437
        """Check the message content and return filename if okay."""
438
        if msg.type not in ['file', 'collection', 'dataset']:
4!
439
            logger.debug("Message type not supported: %s", str(msg.type))
×
440
            return None
×
441

442
        filename = get_filename_from_uri(msg.data.get('uri'))
4✔
443
        if not os.path.exists(filename):
4✔
444
            logger.warning("File does not exist: %s", filename)
4✔
445
            return None
4✔
446

447
        file_ok = check_file_type_okay(msg.data.get('type'))
4✔
448
        if not file_ok:
4✔
449
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
4✔
450
            for output_msg in output_messages:
4✔
451
                logger.debug("Sending message: %s", str(output_msg))
4✔
452
                self.publisher.send(str(output_msg))
4✔
453
            return None
4✔
454

455
        return filename
4✔
456

457
    def do_postprocessing_on_message(self, msg, filename):
4✔
458
        """Do the fires post processing on a message."""
459
        platform_name = msg.data.get('platform_name')
×
460
        af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
×
461
                                                   timezone=self.timezone)
462
        afdata = af_shapeff.get_af_data(self.infile_pattern)
×
463
        if len(afdata) == 0:
×
464
            output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
×
465
            for output_msg in output_messages:
×
466
                logger.debug("Sending message: %s", str(output_msg))
×
467
                self.publisher.send(str(output_msg))
×
468
            return
×
469

470
        afdata = self.fires_filtering(msg, af_shapeff)
×
471
        logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
×
472
        if len(afdata) == 0:
×
473
            logger.debug("No fires - so no regional filtering to be done!")
×
474
            return
×
475

UNCOV
476
        afdata = self.add_unique_day_id(afdata)
×
477
        self.save_id_to_file()
×
478

NEW
479
        afdata = self.add_tb_celcius(afdata)
×
480

481
        # 1) Create geojson feature collection
482
        # 2) Dump geojson data to disk
483
        feature_collection = geojson_feature_collection_from_detections(afdata,
×
484
                                                                        platform_name=af_shapeff.platform_name)
485

486
        fmda = af_shapeff.metadata
×
487
        pout = Parser(self.outfile_pattern_national)
×
488
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
×
489
        logger.debug("Output file path = %s", out_filepath)
×
490

491
        if feature_collection is None:
×
492
            logger.info("No geojson file created, number of fires after filtering = %d", len(afdata))
×
493
            output_messages = self._generate_no_fires_messages(msg,
×
494
                                                               'No true fire detections inside National borders')  # noqa
495
        else:
496
            store_geojson(out_filepath, feature_collection)
×
497
            output_messages = self.get_output_messages(out_filepath, msg, len(afdata))
×
498

499
        for output_msg in output_messages:
×
500
            if output_msg:
×
501
                logger.debug("Sending message: %s", str(output_msg))
×
502
                self.publisher.send(str(output_msg))
×
503

504
        # Do the regional filtering now:
505
        if not self.regional_filtermask:
×
506
            logger.info("No regional filtering is attempted.")
×
507
            return
×
508

509
        # FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
510
        af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
×
511
                                                   timezone=self.timezone)
512
        regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
×
513
                                                             globstr=self.regional_shapefiles_globstr)
514
        regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
×
515
        for region_msg in regional_messages:
×
516
            logger.debug("Sending message: %s", str(region_msg))
×
517
            self.publisher.send(str(region_msg))
×
518

519
    def run(self):
4✔
520
        """Run the AF post processing."""
521
        while self.loop:
×
522
            try:
×
523
                msg = self.listener.output_queue.get(timeout=1)
×
524
                logger.debug("Message: %s", str(msg.data))
×
525
            except Empty:
×
526
                continue
×
527
            else:
528
                filename = self.check_incoming_message_and_get_filename(msg)
×
529
                if not filename:
×
530
                    continue
×
531

532
                self.do_postprocessing_on_message(msg, filename)
×
533

534
    def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
4✔
535
        """From the regional-fires-filter-mask and the fire detection data send regional messages."""
536
        logger.debug("Perform regional masking on VIIRS AF detections and publish accordingly.")
4✔
537

538
        afdata = afsff_obj.get_af_data()
4✔
539
        fmda = afsff_obj.metadata
4✔
540

541
        fmda['platform'] = afsff_obj.platform_name
4✔
542

543
        pout = Parser(self.outfile_pattern_regional)
4✔
544

545
        output_messages = []
4✔
546
        regions_with_detections = 0
4✔
547
        for region_name in regional_fmask:
4✔
548
            if not regional_fmask[region_name]['some_inside_test_area']:
4✔
549
                continue
4✔
550

551
            regions_with_detections = regions_with_detections + 1
4✔
552
            fmda['region_name'] = regional_fmask[region_name]['attributes']['Kod_omr']
4✔
553

554
            out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
555
            logger.debug("Output file path = %s", out_filepath)
4✔
556
            data_in_region = afdata[regional_fmask[region_name]['mask']]
4✔
557

558
            feature_collection = geojson_feature_collection_from_detections(data_in_region,
4✔
559
                                                                            platform_name=fmda['platform'])
560
            if feature_collection is None:
4!
561
                logger.warning("Something wrong happended storing regional " +
×
562
                               "data to Geojson - area: {name}".format(name=str(region_name)))
563
                continue
×
564

565
            store_geojson(out_filepath, feature_collection)
4✔
566

567
            outmsg = self._generate_output_message(out_filepath, msg, regional_fmask[region_name])
4✔
568
            output_messages.append(outmsg)
4✔
569
            logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))
4✔
570

571
        logger.debug("Regional masking done. Number of regions with fire " +
4✔
572
                     "detections on this granule: %s" % str(regions_with_detections))
573
        return output_messages
4✔
574

575
    def fires_filtering(self, msg, af_shapeff):
4✔
576
        """Read Active Fire data and perform spatial filtering removing false detections.
577

578
        Do the national filtering first, and then filter out potential false
579
        detections by the special mask for that.
580

581
        """
582
        logger.debug("Read VIIRS AF detections and perform quality control and spatial filtering")
4✔
583

584
        fmda = af_shapeff.metadata
4✔
585
        # metdata contains time and everything but it is not being transfered to the dataframe.attrs
586

587
        pout = Parser(self.outfile_pattern_national)
4✔
588
        out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
4✔
589
        logger.debug("Output file path = %s", out_filepath)
4✔
590

591
        # National filtering:
592
        af_shapeff.fires_filtering(self.shp_borders)
4✔
593

594
        # Metadata should be transfered here!
595
        afdata_ff = af_shapeff.get_af_data()
4✔
596

597
        if len(afdata_ff) > 0:
4!
598
            logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
4✔
599
            af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
4✔
600
            afdata_ff = af_shapeff.get_af_data()
4✔
601
            logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))
4✔
602

603
        return afdata_ff
4✔
604

605
    # def create_output(self, data, metadata, outputs):
606
    #     """Create geojson output and return filepaths."""
607
    #     paths_and_units = []
608
    #     for item in outputs:
609
    #         for output in item:
610
    #             filepath = os.path.join(self.output_dir, item[output]['parser'].compose(metadata))
611
    #             if 'unit' in item[output]:
612
    #                 paths_and_units.append({'filepath': filepath, 'unit': item[output]['unit']})
613
    #             else:
614
    #                 paths_and_units.append({'filepath': filepath})
615

616
    #     filepaths = []
617
    #     for item in paths_and_units:
618
    #         out_filepath = item['filepath']
619
    #         logger.debug("Output file path = %s", out_filepath)
620
    #         if 'unit' in item:
621
    #             filepath = store_geojson(out_filepath, data, platform_name=metadata['platform'],
622
    #                                      units={'temperature': item['unit']})
623
    #         else:
624
    #             filepath = store_geojson(out_filepath, data, platform_name=metadata['platform'])
625

626
    #         filepaths.append(filepath)
627

628
    #     return filepaths
629

630
    def get_output_messages(self, filepath, msg, number_of_data):
4✔
631
        """Generate the adequate output message(s) depending on if an output file was created or not."""
632
        logger.info("Geojson file created! Number of fires = %d", number_of_data)
4✔
633
        return [self._generate_output_message(filepath, msg)]
4✔
634

635
    def _generate_output_message(self, filepath, input_msg, region=None):
4✔
636
        """Create the output message to publish."""
637
        output_topic = generate_posttroll_topic(self.output_topic, region)
4✔
638
        to_send = prepare_posttroll_message(input_msg, region)
4✔
639
        to_send['uri'] = ('ssh://%s/%s' % (self.host, filepath))
4✔
640
        to_send['uid'] = os.path.basename(filepath)
4✔
641
        to_send['type'] = 'GEOJSON-filtered'
4✔
642
        to_send['format'] = 'geojson'
4✔
643
        to_send['product'] = 'afimg'
4✔
644
        pubmsg = Message(output_topic, 'file', to_send)
4✔
645
        return pubmsg
4✔
646

647
    def _generate_no_fires_messages(self, input_msg, msg_string):
4✔
648
        """Create the output messages to publish."""
649
        to_send = prepare_posttroll_message(input_msg)
4✔
650
        to_send['info'] = msg_string
4✔
651
        publish_messages = []
4✔
652
        for ext in ['National', 'Regional']:
4✔
653
            topic = self.output_topic + '/' + ext
4✔
654
            publish_messages.append(Message(topic, 'info', to_send))
4✔
655

656
        return publish_messages
4✔
657

658
    def _check_borders_shapes_exists(self):
4✔
659
        """Check that the national borders shapefile exists on disk."""
660
        if not os.path.exists(self.shp_borders):
4✔
661
            raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)
4✔
662

663
    def _initialize_fire_detection_id(self):
4✔
664
        """Initialize the fire detection ID."""
665
        if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
4✔
666
            self._fire_detection_id = self.get_id_from_file()
4✔
667
        else:
668
            self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}
4✔
669

670
    def update_fire_detection_id(self):
4✔
671
        """Update the fire detection ID registry."""
672
        now = datetime.utcnow()
4✔
673
        tdelta = now - self._fire_detection_id['date']
4✔
674
        if tdelta.total_seconds() > 24*3600:
4✔
675
            self._initialize_fire_detection_id()
4✔
676
        elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
4✔
677
            self._initialize_fire_detection_id()
4✔
678

679
        self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1
4✔
680

681
    def save_id_to_file(self):
4✔
682
        """Save the (current) detection id on disk.
683

684
        It is assumed that the user permissions are so that a file can actually
685
        be written to disk here!
686
        """
687
        with open(self.filepath_detection_id_cache, 'w') as fpt:
4✔
688
            id_ = self._create_id_string()
4✔
689
            fpt.write(id_)
4✔
690

691
    def get_id_from_file(self):
4✔
692
        """Read the latest stored detection id string from disk and convert to internal format."""
693
        with open(self.filepath_detection_id_cache, 'r') as fpt:
4✔
694
            idstr = fpt.read()
4✔
695

696
        return self._get_id_from_string(idstr)
4✔
697

698
    def _get_id_from_string(self, idstr):
4✔
699
        """Get the detection id from string."""
700
        datestr, counter = idstr.split('-')
4✔
701
        return {'date': datetime.strptime(datestr, '%Y%m%d'),
4✔
702
                'counter': int(counter)}
703

704
    def _create_id_string(self):
4✔
705
        """From the internal fire detection id create the id string to be exposed to the user."""
706
        return (self._fire_detection_id['date'].strftime('%Y%m%d') +
4✔
707
                '-' + str(self._fire_detection_id['counter']))
708

709
    def add_unique_day_id(self, afdata):
4✔
710
        """Add a unique detection id - date + a running number for the day."""
711
        # Add id's to the detections:
712
        id_list = []
4✔
713
        for _i in range(len(afdata)):
4✔
714
            self.update_fire_detection_id()
4✔
715
            id_ = self._create_id_string()
4✔
716
            id_list.append(id_)
4✔
717

718
        afdata['detection_id'] = id_list
4✔
719
        return afdata
4✔
720

721
    def add_tb_celcius(self, data_frame):
4✔
722
        """Add a column with TB in Celcius to the fire detection data frame."""
723
        tbc_list = []
4✔
724
        for _i in range(len(data_frame)):
4✔
725
            tbc = self.unit_converter.convert('temperature', data_frame['tb'].iloc[_i])
4✔
726
            tbc_list.append(tbc.magnitude)
4✔
727

728
        data_frame['tb_celcius'] = tbc_list
4✔
729
        return data_frame
4✔
730

731
    def close(self):
4✔
732
        """Shutdown the Active Fires postprocessing."""
733
        logger.info('Terminating Active Fires post processing.')
×
734
        self.loop = False
×
735
        logger.info('Dumping the latest detection id to disk: %s', str(self.filepath_detection_id_cache))
×
736
        self.save_id_to_file()
×
737
        try:
×
738
            self.listener.stop()
×
739
        except Exception:
×
740
            logger.exception("Couldn't stop listener.")
×
741
        if self.publisher:
×
742
            try:
×
743
                self.publisher.stop()
×
744
            except Exception:
×
745
                logger.exception("Couldn't stop publisher.")
×
746

747

748
def check_file_type_okay(file_type):
4✔
749
    """Check if the file is of the correct type."""
750
    if file_type not in ['txt', 'TXT']:
4✔
751
        logger.info('File type not txt: %s', str(file_type))
4✔
752
        return False
4✔
753
    return True
4✔
754

755

756
def get_filename_from_uri(uri):
4✔
757
    """Get the file name from the uri given."""
758
    logger.info('File uri: %s', str(uri))
4✔
759
    url = urlparse(uri)
4✔
760
    return url.path
4✔
761

762

763
def generate_posttroll_topic(output_topic, region=None):
4✔
764
    """Create the topic for the posttroll message to publish."""
765
    if region:
4✔
766
        output_topic = output_topic + '/Regional/' + region['attributes']['Kod_omr']
4✔
767
    else:
768
        output_topic = output_topic + '/National'
4✔
769

770
    return output_topic
4✔
771

772

773
def prepare_posttroll_message(input_msg, region=None):
4✔
774
    """Create the basic posttroll-message fields and return."""
775
    to_send = input_msg.data.copy()
4✔
776
    to_send.pop('dataset', None)
4✔
777
    to_send.pop('collection', None)
4✔
778
    to_send.pop('uri', None)
4✔
779
    to_send.pop('uid', None)
4✔
780
    to_send.pop('format', None)
4✔
781
    to_send.pop('type', None)
4✔
782
    # FIXME! Check that the region_name is stored as a unicode string!
783
    if region:
4✔
784
        to_send['region_name'] = region['attributes']['Testomr']
4✔
785
        to_send['region_code'] = region['attributes']['Kod_omr']
4✔
786

787
    return to_send
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc