• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pytroll / pytroll-pps-runner / 9282584827

29 May 2024 08:14AM UTC coverage: 75.907% (+1.0%) from 74.884%
9282584827

Pull #62

github

web-flow
Merge e7a399b31 into 311997dcf
Pull Request #62: PPS product and level1c collector

389 of 557 branches covered (69.84%)

Branch coverage included in aggregate %.

100 of 135 new or added lines in 6 files covered. (74.07%)

19 existing lines in 2 files now uncovered.

1473 of 1896 relevant lines covered (77.69%)

3.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.22
/nwcsafpps_runner/pps_posttroll_hook.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2018 - 2021 Adam.Dybbroe
5

6
# Author(s):
7

8
#   Adam.Dybbroe <adam.dybbroe@smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""A PPS processing post hook to be run once PPS is ready with a PGE. Using
4✔
24
Posttroll and sends messages notifying the completion of a PGE
25

26
The metadata passed to a hook is::
27

28
    metadata = {'module': Name of the PPS-script calling this hook.
29
                'pps_version': PPS version (string)
30
                'platform_name': Name of the satellite
31
                'orbit': Orbit number (Can be 00000 or 99999 if unknown eg. for MODIS or GAC)
32
                'sensor': Name of the satellite sensor
33
                'start_time': Start time of the satellite scene. (string)
34
                'end_time': End time of the satellite scene. (string)
35
                'filename': The name of the file that is supposed to no be produced by this module. It can also
36
                            be a list of filenames.
37
                'file_was_already_processed': A Boolean value. If True, the file(s) to be produced by the script already
38
                                             exist; thus the script did basically nothing, but ended with status
39
                                             SUCCEED. If False, the output file did not exist in beforehand. But this
40
                                             parameter does not tell if the script failed or succeeded. (Use ‘status’
41
                                             for that information.)
42
               }
43

44
The hook is initialized when the yaml config file is read, so it needs a `__setstate__` method.
45

46
"""
47

48
import logging
4✔
49
import os
4✔
50
import socket
4✔
51
import threading
4✔
52
import time
4✔
53
from datetime import timedelta
4✔
54
from multiprocessing import Manager
4✔
55

56
from posttroll.message import Message
4✔
57
from posttroll.publisher import Publish
4✔
58

59
LOG = logging.getLogger(__name__)
4✔
60

61
VIIRS_TIME_THR1 = timedelta(seconds=81)
4✔
62
VIIRS_TIME_THR2 = timedelta(seconds=87)
4✔
63
WAIT_SECONDS_TO_ALLOW_PUBLISHER_TO_BE_REGISTERED = 2.2
4✔
64

65
PPS_PRODUCT_FILE_ID = {'ppsMakeAvhrr': 'RAD_SUN',
4✔
66
                       'ppsMakeViirs': 'RAD_SUN',
67
                       'ppsMakePhysiography': 'PHY',
68
                       'ppsMakeNwp': 'NWP',
69
                       'ppsCmaskPrepare': 'CMA-PRE',
70
                       'ppsCmask': 'CMA',
71
                       'ppsCmaskProb': 'CMAProb',
72
                       'ppsCtth': 'CTTH',
73
                       'ppsCtype': 'CT',
74
                       'ppsCpp': 'CPP',
75
                       'ppsCmic': 'CMIC',
76
                       'ppsHrw': 'HRW',
77
                       'ppsPrecip': 'PC',
78
                       'ppsPrecipPrepare': 'PC-PRE'}
79

80
PLATFORM_CONVERSION_PPS2OSCAR = {'noaa20': 'NOAA-20',
4✔
81
                                 'noaa21': 'NOAA-21',
82
                                 'noaa22': 'NOAA-22',
83
                                 'noaa23': 'NOAA-23',
84
                                 'metopsga1': 'Metop-SG-A1',
85
                                 'metopsga2': 'Metop-SG-A2',
86
                                 'metopsga3': 'Metop-SG-A3',
87
                                 'noaa19': 'NOAA-19',
88
                                 'noaa18': 'NOAA-18',
89
                                 'noaa15': 'NOAA-15',
90
                                 'metop02': 'Metop-A',
91
                                 'metop01': 'Metop-B',
92
                                 'metop03': 'Metop-C',
93
                                 'npp': 'Suomi-NPP',
94
                                 'eos1': 'EOS-Terra',
95
                                 'eos2': 'EOS-Aqua',
96
                                 }
97

98
MANDATORY_FIELDS_FROM_YAML = {'level': 'data_processing_level',
4✔
99
                              'output_format': 'format',
100
                              'variant': 'variant',
101
                              'geo_or_polar': 'geo_or_polar',
102
                              'software': 'software'}
103

104
VARIANT_TRANSLATE = {'DR': 'direct_readout'}
4✔
105

106
SEC_DURATION_ONE_GRANULE = 1.779
4✔
107
MIN_VIIRS_GRANULE_LENGTH_SECONDS = timedelta(seconds=60)
4✔
108
MAX_VIIRS_GRANULE_LENGTH_SECONDS = timedelta(seconds=88)
4✔
109
# One nominal VIIRS granule is 48 scans. The duration of one scan is 1.779 seconds.
110
# Thus one granule is 1.779*48 = 85.4 seconds long.
111
# Sometimes an SDR granule may be shorter if one or more scans are missing.
112
# https://ncc.nesdis.noaa.gov/documents/documentation/viirs-users-guide-tech-report-142a-v1.3.pdf
113
# Check page 37!
114
#
115

116

117
class PPSPublisher(threading.Thread):
4✔
118

119
    """A publisher for the PPS modules.
120

121
    It publish a message via posttroll when a PPS module has finished.
122

123
    """
124

125
    def __init__(self, queue, nameservers=None):
4✔
126
        threading.Thread.__init__(self)
4✔
127
        self.queue = queue
4✔
128
        self.nameservers = nameservers
4✔
129

130
    def stop(self):
4✔
131
        """Stops the file publisher"""
132
        self.queue.put(None)
4✔
133

134
    def run(self):
4✔
135

136
        with Publish('PPS', 0, nameservers=self.nameservers) as publisher:
4!
137
            time.sleep(WAIT_SECONDS_TO_ALLOW_PUBLISHER_TO_BE_REGISTERED)
4✔
138

139
            while True:
×
140
                retv = self.queue.get()
×
141

142
                if retv is not None:
×
143
                    LOG.info("Publish the message...")
×
144
                    publisher.send(retv)
×
145
                    LOG.info("Message published!")
×
146
                else:
147
                    time.sleep(1.0)
×
148
                    break
×
149

150

151
class PPSMessage(object):
4✔
152

153
    """A Posttroll message class to trigger the sending of a notifcation that a PPS PGE is ready
154

155
    """
156

157
    def __init__(self, description, metadata):
4✔
158

159
        # __init__ is not run when created from yaml
160
        # See http://pyyaml.org/ticket/48
161
        pass
×
162

163
    def __getstate__(self):
4✔
164
        """Example - metadata:
165
        posttroll_topic: "/PPSv2018"
166
        station: "norrkoping"
167
        output_format: "CF"
168
        level: "2"
169
        variant: DR
170
        geo_or_polar: "polar"
171
        software: "NWCSAF-PPSv2018"
172
        """
173
        d__ = {'metadata': self.metadata}
×
174
        return d__
×
175

176
    def __setstate__(self, mydict):
4✔
177
        self.metadata = mydict['metadata']
4✔
178

179
    def __call__(self, status, mda):
4✔
180
        """Send the message based on the metadata and the fields picked up from the yaml config."""
181

182
        self._collect_all_metadata(mda)
4✔
183
        message = PostTrollMessage(status, self.metadata)
4✔
184
        message.send()
4✔
185

186
    def _collect_all_metadata(self, mda):
4✔
187
        """Collect the static (from yaml config) and dynamic metadata into one dict."""
188
        self.metadata.update(mda)
4✔
189

190

191
class PostTrollMessage(object):
4✔
192
    """Create a Posttroll message from metadata."""
193

194
    def __init__(self, status, metadata):
4✔
195
        """Initialize the object."""
196
        self.metadata = metadata
4✔
197
        self.status = status
4✔
198
        self._to_send = {}
4✔
199
        self.viirs_granule_time_bounds = (MIN_VIIRS_GRANULE_LENGTH_SECONDS,
4✔
200
                                          MAX_VIIRS_GRANULE_LENGTH_SECONDS)
201
        # Check that the metadata has what is required:
202
        self.check_metadata_contains_mandatory_parameters()
4✔
203
        self.check_metadata_contains_filename()
4✔
204
        self.nameservers = self.get_nameservers()
4✔
205

206
        for key in self.metadata:
4✔
207
            LOG.debug("%s = %s", str(key), str(self.metadata[key]))
4✔
208

209
    def get_nameservers(self):
4✔
210
        """Get nameserver from metadata. Defaults to None"""
211
        nameservers = self.metadata.get('nameservers', None)
4✔
212
        if nameservers and not isinstance(nameservers, list):
4✔
213
            LOG.warning("Nameserver metadata must be a list. Setting to None.")
4✔
214
            nameservers = None
4✔
215
        return nameservers
4✔
216

217
    def check_metadata_contains_mandatory_parameters(self):
4✔
218
        """Check that all necessary metadata attributes are available."""
219

220
        attributes = ['start_time', 'end_time']
4✔
221
        for attr in attributes:
4✔
222
            if attr not in self.metadata:
4!
223
                raise AttributeError("%s is a required attribute but is missing in metadata!" % attr)
×
224

225
    def check_metadata_contains_filename(self):
4✔
226
        """Check that the input metadata structure contains filename."""
227

228
        if 'filename' not in self.metadata:
4✔
229
            raise KeyError('filename')
4✔
230

231
    def check_mandatory_fields(self):
4✔
232
        """Check that mandatory fields are available in the metadata dict.
233

234
        level, output_format and station are all required fields,
235
        unless the posttroll_topic is specified.
236
        """
237
        if 'posttroll_topic' in self.metadata:
4✔
238
            return
4✔
239

240
        for attr in MANDATORY_FIELDS_FROM_YAML:
4✔
241
            if attr not in self.metadata:
4✔
242
                raise AttributeError("pps_hook must contain metadata attribute %s" % attr)
4✔
243

244
    def send(self):
4✔
245
        """Create and publish (send) the message."""
246

247
        if self.status != 0:
4✔
248
            # Error
249
            # pubmsg = self.create_message("FAILED", self.metadata)
250
            LOG.warning("Module %s failed, so no message sent", self.metadata.get('module', 'unknown'))
4✔
251
        else:
252
            # Ok
253
            pubmsg = self.create_message("OK")
4✔
254
            self.publish_message(pubmsg)
4✔
255

256
    def publish_message(self, mymessage):
4✔
257
        """Publish the message."""
UNCOV
258
        posttroll_msg = Message(mymessage['header'], mymessage['type'], mymessage['content'])
×
UNCOV
259
        msg_to_publish = posttroll_msg.encode()
×
260

261
        manager = Manager()
×
UNCOV
262
        publisher_q = manager.Queue()
×
263

264
        pub_thread = PPSPublisher(publisher_q, self.nameservers)
×
UNCOV
265
        pub_thread.start()
×
266
        LOG.info("Sending: " + str(msg_to_publish))
×
267
        publisher_q.put(msg_to_publish)
×
268
        pub_thread.stop()
×
269

270
    def create_message(self, status):
4✔
271
        """Create the posttroll message from the PPS metadata.
272

273
        The metadata provided by pps has the following keys: module, pps_version, platform_name, orbit, sensor,
274
        start_time, end_time, filename, file_was_already_processed.
275
        This class adds also the following metadata keys: pps_product
276
        Also the extra metadata provided in the configuration yaml file is available.
277
        That way, the publish_topic can be a pattern with metadata keys in it, eg::
278

279
          '/my/pps/publish/topic/{pps_product}/{sensor}/'
280

281
        """
282
        self._to_send = self.create_message_content_from_metadata()
4✔
283
        self._to_send.update({'status': status})
4✔
284
        # Add uri/uids to message content
285
        self._to_send.update(self.get_message_with_uri_and_uid())
4✔
286

287
        self.fix_mandatory_fields_in_message()
4✔
288
        self.clean_unused_keys_in_message()
4✔
289

290
        publish_topic = self._create_message_topic()
4✔
291
        msg_type = 'file'
4✔
292
        if "dataset" in self._to_send:
4!
NEW
293
            msg_type = 'dataset'
×
294
        return {'header': publish_topic, 'type': msg_type, 'content': self._to_send}
4✔
295

296
    def _create_message_topic(self):
4✔
297
        """Create the publish topic from yaml file items and PPS metadata."""
298
        to_send = self._to_send.copy()
4✔
299
        to_send["pps_product"] = PPS_PRODUCT_FILE_ID.get(self.metadata.get('module', 'unknown'), 'UNKNOWN')
4✔
300
        to_send["variant"] = VARIANT_TRANSLATE.get(self._to_send['variant'], self._to_send['variant'])
4✔
301

302
        topic_pattern = to_send.get('publish_topic', self._create_default_topic())
4✔
303

304
        topic_str = topic_pattern.format(**to_send)
4✔
305
        return topic_str
4✔
306

307
    def _create_default_topic(self):
4✔
308
        topic = '/segment' if self.is_segment() else ""
4✔
309
        topic_pattern = "/".join((topic,
4✔
310
                                  "{geo_or_polar}",
311
                                  "{variant}",
312
                                  "{format}",
313
                                  "{data_processing_level}",
314
                                  "{pps_product}",
315
                                  "{software}",
316
                                  ""))
317
        return topic_pattern
4✔
318

319
    def create_message_content_from_metadata(self):
4✔
320
        """Create message content from the metadata."""
321
        msg = {}
4✔
322
        for key in self.metadata:
4✔
323
            # Disregard the PPS keyword "filename". We will use URI/UID instead - see below:
324
            if key not in msg and key != 'filename':
4✔
325
                msg[key] = self.metadata[key]
4✔
326

327
            if key == 'platform_name':
4✔
328
                msg[key] = PLATFORM_CONVERSION_PPS2OSCAR.get(self.metadata[key], self.metadata[key])
4✔
329

330
        return msg
4✔
331

332
    def fix_mandatory_fields_in_message(self):
4✔
333
        """Fix the message keywords from the mandatory fields."""
334
        self.check_mandatory_fields()
4✔
335

336
        # Initialize:
337
        for attr in MANDATORY_FIELDS_FROM_YAML:
4✔
338
            self._to_send[MANDATORY_FIELDS_FROM_YAML.get(attr)] = self.metadata[attr]
4✔
339

340
    def clean_unused_keys_in_message(self):
4✔
341
        """Clean away the unused keyword names from message."""
342
        for attr in MANDATORY_FIELDS_FROM_YAML:
4✔
343
            if attr not in MANDATORY_FIELDS_FROM_YAML.values():
4✔
344
                del self._to_send[attr]
4✔
345

346
    def get_message_with_uri_and_uid(self):
4✔
347
        """Generate a dict with the uri and uid's and return it."""
348
        if 'filename' not in self.metadata:
4✔
349
            return {}
4✔
350

351
        servername = socket.gethostname()
4✔
352
        LOG.debug("Servername = %s", str(servername))
4✔
353

354
        msg = {}
4✔
355
        if isinstance(self.metadata['filename'], list):
4✔
356
            dataset = []
4✔
357
            for filename in self.metadata['filename']:
4✔
358
                uri = os.path.abspath(filename)
4✔
359
                uid = os.path.basename(filename)
4✔
360
                dataset.append({'uri': uri, 'uid': uid})
4✔
361
            msg['dataset'] = dataset
4✔
362
        else:
363
            filename = self.metadata['filename']
4✔
364
            uri = os.path.abspath(filename)
4✔
365
            msg['uri'] = uri
4✔
366
            if 'uid' not in self.metadata:
4!
367
                LOG.debug("Add uid as it was not included in the metadata from PPS")
4✔
368
                LOG.debug("Filename = %s", filename)
4✔
369
                msg['uid'] = os.path.basename(filename)
4✔
370

371
        return msg
4✔
372

373
    def is_segment(self):
4✔
374
        """Determine if the scene is a 'segment'.
375

376
        That means a sensor data granule, e.g. 85 seconds of VIIRS.
377
        """
378
        if not self.sensor_is_viirs():
4!
UNCOV
379
            LOG.debug("Scene is not a VIIRS scene - and we assume then not a segment of a larger scene")
×
UNCOV
380
            return False
×
381

382
        delta_t = self.get_granule_duration()
4✔
383
        LOG.debug("Scene length: %s", str(delta_t.total_seconds()))
4✔
384
        if self.viirs_granule_time_bounds[0] < delta_t < self.viirs_granule_time_bounds[1]:
4✔
385
            LOG.info("VIIRS scene is a segment. Scene length = %s", str(delta_t))
4✔
386
            return True
4✔
387

388
        LOG.debug("VIIRS scene is not a segment")
4✔
389
        return False
4✔
390

391
    def sensor_is_viirs(self):
4✔
392
        """Check if the sensor is equal to VIIRS."""
393
        sensor = self.metadata.get('sensor')
4✔
394
        LOG.debug("Sensor = %s", str(sensor))
4✔
395
        return sensor == 'viirs'
4✔
396

397
    def get_granule_duration(self):
4✔
398
        """Derive the scene/granule duration as a timedelta object."""
399
        starttime = self.metadata['start_time']
4✔
400
        endtime = self.metadata['end_time']
4✔
401
        return (endtime - starttime + timedelta(seconds=SEC_DURATION_ONE_GRANULE))
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc