• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pytroll / pytroll-pps-runner / 9282584827

29 May 2024 08:14AM UTC coverage: 75.907% (+1.0%) from 74.884%
9282584827

Pull #62

github

web-flow
Merge e7a399b31 into 311997dcf
Pull Request #62: PPS product and level1c collector

389 of 557 branches covered (69.84%)

Branch coverage included in aggregate %.

100 of 135 new or added lines in 6 files covered. (74.07%)

19 existing lines in 2 files now uncovered.

1473 of 1896 relevant lines covered (77.69%)

3.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.04
/nwcsafpps_runner/utils.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
# Copyright (c) 2018 - 2022 Pytroll Developers
5

6
# Author(s):
7

8
#   Adam.Dybbroe <Firstname.Lastname at smhi.se>
9

10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU General Public License as published by
12
# the Free Software Foundation, either version 3 of the License, or
13
# (at your option) any later version.
14

15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
# GNU General Public License for more details.
19

20
# You should have received a copy of the GNU General Public License
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
22

23
"""Utility functions for NWCSAF/pps runner(s)."""
4✔
24

25
import logging
4✔
26
import os
4✔
27
import shlex
4✔
28
import socket
4✔
29
import threading
4✔
30
from glob import glob
4✔
31
from subprocess import PIPE, Popen
4✔
32
from urllib.parse import urlparse
4✔
33

34
from posttroll.address_receiver import get_local_ips
4✔
35
# from trollsift import Parser
36
from posttroll.message import Message  # @UnresolvedImport
4✔
37
from trollsift.parser import parse  # @UnresolvedImport
4✔
38

39
LOG = logging.getLogger(__name__)
4✔
40

41

42
class NwpPrepareError(Exception):
4✔
43
    pass
4✔
44

45

46
class FindTimeControlFileError(Exception):
4✔
47
    pass
4✔
48

49

50
class PpsRunError(Exception):
4✔
51
    pass
4✔
52

53

54
PPS_OUT_PATTERN = ("S_NWC_{segment}_{orig_platform_name}_{orbit_number:05d}_" +
4✔
55
                   "{start_time:%Y%m%dT%H%M%S%f}Z_{end_time:%Y%m%dT%H%M%S%f}Z.{extention}")
56
PPS_OUT_PATTERN_MULTIPLE = ("S_NWC_{segment1}_{segment2}_{orig_platform_name}_{orbit_number:05d}_" +
4✔
57
                            "{start_time:%Y%m%dT%H%M%S%f}Z_{end_time:%Y%m%dT%H%M%S%f}Z.{extention}")
58
PPS_STAT_PATTERN = ("S_NWC_{segment}_{orig_platform_name}_{orbit_number:05d}_" +
4✔
59
                    "{start_time:%Y%m%dT%H%M%S%f}Z_{end_time:%Y%m%dT%H%M%S%f}Z_statistics.xml")
60

61
SUPPORTED_AVHRR_SATELLITES = ['NOAA-15', 'NOAA-18', 'NOAA-19',
4✔
62
                              'Metop-B', 'Metop-A', 'Metop-C']
63
SUPPORTED_EARS_AVHRR_SATELLITES = ['Metop-B', 'Metop-C']
4✔
64
SUPPORTED_MODIS_SATELLITES = ['EOS-Terra', 'EOS-Aqua']
4✔
65
SUPPORTED_VIIRS_SATELLITES = ['Suomi-NPP', 'NOAA-20', 'NOAA-21', 'NOAA-22', 'NOAA-23']
4✔
66
SUPPORTED_SEVIRI_SATELLITES = ['Meteosat-09', 'Meteosat-10', 'Meteosat-11']
4✔
67
SUPPORTED_METIMAGE_SATELLITES = ['Metop-SG-A1', 'Metop-SG-A2', 'Metop-SG-A3']
4✔
68

69
SUPPORTED_PPS_SATELLITES = (SUPPORTED_AVHRR_SATELLITES +
4✔
70
                            SUPPORTED_MODIS_SATELLITES +
71
                            SUPPORTED_SEVIRI_SATELLITES +
72
                            SUPPORTED_METIMAGE_SATELLITES +
73
                            SUPPORTED_VIIRS_SATELLITES)
74

75
GEOLOC_PREFIX = {'EOS-Aqua': 'MYD03', 'EOS-Terra': 'MOD03'}
4✔
76
DATA1KM_PREFIX = {'EOS-Aqua': 'MYD021km', 'EOS-Terra': 'MOD021km'}
4✔
77

78
PPS_SENSORS = ['amsu-a', 'amsu-b', 'mhs', 'avhrr/3', 'viirs', 'modis', 'seviri', 'metimage']
4✔
79
NOAA_METOP_PPS_SENSORNAMES = ['avhrr/3', 'amsu-a', 'amsu-b', 'mhs']
4✔
80

81
METOP_NAME_LETTER = {'metop01': 'metopb', 'metop02': 'metopa', 'metop03': 'metopc'}
4✔
82
METOP_NAME = {'metop01': 'Metop-B', 'metop02': 'Metop-A', 'metop03': 'Metop-C'}
4✔
83
METOP_NAME_INV = {'metopb': 'metop01', 'metopa': 'metop02', 'metopc': 'metop03'}
4✔
84

85
# SATELLITE_NAME = {}
86
# for sat in SUPPORTED_PPS_SATELLITES:
87
#     SATELLITE_NAME[sat] = sat.lower().replace('-', '')
88
# historic exceptions
89
# SATELLITE_NAME['Suomi-NPP'] = 'npp'
90
# SATELLITE_NAME['EOS-Aqua'] = 'eos2'
91
# SATELLITE_NAME['EOS-Terra'] = 'eos1'
92
# SATELLITE_NAME['Metop-A']= 'metop02'
93
# SATELLITE_NAME['Metop-B']= 'metop01'
94
# SATELLITE_NAME['Metop-C']= 'metop03'
95

96
SENSOR_LIST = {}
4✔
97
for sat in SUPPORTED_PPS_SATELLITES:
4✔
98
    if sat in SUPPORTED_AVHRR_SATELLITES:
4✔
99
        SENSOR_LIST[sat] = ['avhrr/3']
4✔
100
    elif sat in SUPPORTED_MODIS_SATELLITES:
4✔
101
        SENSOR_LIST[sat] = ['modis']
4✔
102
    elif sat in SUPPORTED_VIIRS_SATELLITES:
4✔
103
        SENSOR_LIST[sat] = ['viirs']
4✔
104
    elif sat in SUPPORTED_SEVIRI_SATELLITES:
4✔
105
        SENSOR_LIST[sat] = ['seviri']
4✔
106
    elif sat in SUPPORTED_METIMAGE_SATELLITES:
4!
107
        SENSOR_LIST[sat] = ['metimage']
4✔
108

109
METOP_SENSOR = {'amsu-a': 'amsua', 'avhrr/3': 'avhrr',
4✔
110
                'amsu-b': 'amsub', 'hirs/4': 'hirs'}
111

112

113
def run_command(cmdstr):
4✔
114
    """Run system command."""
115
    myargs = shlex.split(str(cmdstr))
4✔
116

117
    LOG.debug("Command: " + str(cmdstr))
4✔
118
    LOG.debug('Command sequence= ' + str(myargs))
4✔
119
    #: TODO: What is this
120
    try:
4✔
121
        proc = Popen(myargs, shell=False, stderr=PIPE, stdout=PIPE)
4✔
122
    except NwpPrepareError:
×
123
        LOG.exception("Failed when preparing NWP data for PPS...")
×
124

125
    out_reader = threading.Thread(
4✔
126
        target=logreader, args=(proc.stdout, LOG.info))
127
    err_reader = threading.Thread(
4✔
128
        target=logreader, args=(proc.stderr, LOG.info))
129
    out_reader.start()
4✔
130
    err_reader.start()
4✔
131
    out_reader.join()
4✔
132
    err_reader.join()
4✔
133

134
    return proc.wait()
4✔
135

136

137
def check_uri(uri):
4✔
138
    """Check that the provided *uri* is on the local host and return the file path."""
139
    if isinstance(uri, (list, set, tuple)):
4✔
140
        paths = [check_uri(ressource) for ressource in uri]
4✔
141
        return paths
4✔
142
    url = urlparse(uri)
4✔
143
    try:
4✔
144
        if url.hostname:
4!
145
            url_ip = socket.gethostbyname(url.hostname)
×
146

147
            if url_ip not in get_local_ips():
×
148
                try:
×
149
                    os.stat(url.path)
×
150
                except OSError:
×
151
                    raise IOError(
×
152
                        "Data file %s unaccessible from this host" % uri)
153

154
    except socket.gaierror:
×
155
        LOG.warning("Couldn't check file location, running anyway")
×
156

157
    return url.path
4✔
158

159

160
def get_lvl1c_file_from_msg(msg):
4✔
161
    """Get level1c file from msg."""
162
    destination = msg.data.get('destination')
4✔
163

164
    uris = []
4✔
165

166
    if msg.type == 'file':
4✔
167
        if destination is None:
4✔
168
            uris = [(msg.data['uri'])]
4✔
169
        else:
170
            uris = [os.path.join(destination, msg.data['uid'])]
4✔
171
    else:
172
        LOG.debug(
4✔
173
            "Ignoring this type of message data: type = " + str(msg.type))
174
        return None
4✔
175

176
    try:
4✔
177
        level1c_files = check_uri(uris)
4✔
178
    except IOError:
×
179
        LOG.info('One or more files not present on this host!')
×
180
        return None
×
181

182
    LOG.debug("files4pps: %s", str(level1c_files))
4✔
183
    return level1c_files[0]
4✔
184

185

186
def check_host_ok(msg):
4✔
187
    """Check that host is ok."""
188
    try:
×
189
        url_ip = socket.gethostbyname(msg.host)
×
190
        if url_ip not in get_local_ips():
×
191
            LOG.warning("Server %s not the current one: %s", str(url_ip), socket.gethostname())
×
192
            return False
×
193
    except (AttributeError, socket.gaierror) as err:
×
194
        LOG.error("Failed checking host! Hostname = %s", socket.gethostname())
×
195
        LOG.exception(err)
×
196
    return True
×
197

198

199
def ready2run(msg, scene, **kwargs):
4✔
200
    """Check whether pps is ready to run or not."""
201
    LOG.info("Got message: " + str(msg))
4✔
202
    if not check_host_ok(msg):
4!
203
        return False
×
204

205
    if scene['file4pps'] is None:
4✔
206
        return False
4✔
207

208
    if msg.data['platform_name'] in SUPPORTED_PPS_SATELLITES:
4!
209
        LOG.info(
4✔
210
            "This is a PPS supported scene. Start the PPS lvl2 processing!")
211
        LOG.info("Process the file = %s" +
4✔
212
                 os.path.basename(scene['file4pps']))
213

214
        LOG.debug("Ready to run...")
4✔
215
        return True
4✔
216

217

218
def terminate_process(popen_obj, scene):
4✔
219
    """Terminate a Popen process."""
220
    if popen_obj.returncode is None:
×
221
        popen_obj.kill()
×
222
        LOG.info(
×
223
            "Process timed out and pre-maturely terminated. Scene: " + str(scene))
224
    else:
225
        LOG.info(
×
226
            "Process finished before time out - workerScene: " + str(scene))
227

228

229
def create_pps_call_command(python_exec, pps_script_name, scene):
4✔
230
    """Create the pps call command."""
231
    cmdstr = ("%s" % python_exec + " %s " % pps_script_name +
×
232
              "-af %s" % scene['file4pps'])
233
    LOG.debug("PPS call command: %s", str(cmdstr))
×
234
    return cmdstr
×
235

236

237
def create_xml_timestat_from_lvl1c(pps_file, pps_control_path):
4✔
238
    """From lvl1c file create XML file and return a file list."""
239
    txt_time_control = create_pps_file_from_lvl1c(pps_file, pps_control_path, "timectrl", ".txt")
4✔
240
    if os.path.exists(txt_time_control):
4✔
241
        return create_xml_timestat_from_ascii(txt_time_control)
4✔
242
    else:
243
        LOG.warning('No XML Time statistics file created!')
4✔
244
        return []
4✔
245

246

247
def find_product_statistics_from_lvl1c(scene, pps_control_path):
4✔
248
    """From lvl1c file find product XML files and return a file list."""
249
    try:
4✔
250
        glob_pattern = create_pps_file_from_lvl1c(scene['file4pps'], pps_control_path, "*", "_statistics.xml")
4✔
251
        return glob(glob_pattern)
4✔
252
    except KeyError:
4✔
253
        return []
4✔
254

255

256
def process_timectrl_xml_from_pps_result_file_with_extension(filename, export_path, extension):
4✔
257
    if export_path is None:
4!
NEW
UNCOV
258
        export_path = os.path.dirname(filename)
×
NEW
UNCOV
259
        print(export_path)
×
260
    timectrl_file = create_pps_file_from_lvl1c(filename, export_path, "timectrl", ".txt")
4✔
261
    timectrl_xmlfile =  timectrl_file.replace(".txt", extension + ".xml")
4✔
262
    print(timectrl_file)
4✔
263
    result_file = create_xml_timestat_from_ascii(timectrl_file, outfile=timectrl_xmlfile)
4✔
264
    return result_file
4✔
265

266

267
def create_pps_file_from_lvl1c(l1c_file_name, pps_control_path, name_tag, file_type):
4✔
268
    """From lvl1c file create name_tag-file of type file_type."""
269
    from trollsift import compose, parse
4✔
270
    f_pattern = 'S_NWC_{name_tag}_{platform_id}_{orbit_number}_{start_time}Z_{end_time}Z{file_type}'
4✔
271
    l1c_path, l1c_file = os.path.split(l1c_file_name)
4✔
272
    data = parse(f_pattern, l1c_file)
4✔
273
    data["name_tag"] = name_tag
4✔
274
    data["file_type"] = file_type
4✔
275
    return os.path.join(pps_control_path, compose(f_pattern, data))
4✔
276

277

278
def create_xml_timestat_from_ascii(infile, outfile=None):
4✔
279
    """From ascii file(s) with PPS time statistics create XML file(s) and return a file list."""
280
    if outfile is None:
4✔
281
        outfile = infile.replace('.txt', '.xml')
4✔
282
    try:
4✔
283
        from pps_time_control import PPSTimeControl
4✔
284
    except ImportError:
×
285
        LOG.warning("Failed to import the PPSTimeControl from pps")
×
UNCOV
286
        return []
×
287
    LOG.info("Time control ascii file: " + str(infile))
4✔
288
    LOG.info("Read time control ascii file and generate XML")
4✔
289
    ppstime_con = PPSTimeControl(infile)
4✔
290
    ppstime_con.sum_up_processing_times()
4✔
291
    try:
4✔
292
        ppstime_con.write_xml(filename=outfile)
4✔
UNCOV
293
    except Exception as e:  # TypeError as e:
×
UNCOV
294
        LOG.warning('Not able to write time control xml file')
×
UNCOV
295
        LOG.warning(e)
×
296
    # There should always be only one xml file for each ascii file found above!
297
    return [outfile]
4✔
298

299

300
def publish_pps_files(input_msg, publish_q, scene, result_files, **kwargs):
4✔
301
    """Publish messages for the files provided."""
302
    servername = kwargs.get('servername')
4✔
303
    station = kwargs.get('station', 'unknown')
4✔
304

305
    for result_file in result_files:
4✔
306
        # Get true start and end time from filenames and adjust the end time in
307
        # the publish message:
308
        filename = os.path.basename(result_file)
4✔
309
        LOG.info("file to publish = %s", str(filename))
4✔
310
        try:
4✔
311
            try:
4✔
312
                metadata = parse(PPS_OUT_PATTERN, filename)
4✔
313
            except ValueError:
4✔
314
                metadata = parse(PPS_OUT_PATTERN_MULTIPLE, filename)
4✔
UNCOV
315
                metadata['segment'] = '_'.join([metadata['segment1'],
×
316
                                                metadata['segment2']])
UNCOV
317
                del metadata['segment1'], metadata['segment2']
×
318
        except ValueError:
4✔
319
            metadata = parse(PPS_STAT_PATTERN, filename)
4✔
320

321
        endtime = metadata['end_time']
4✔
322
        starttime = metadata['start_time']
4✔
323

324
        to_send = input_msg.data.copy()
4✔
325
        to_send.pop('dataset', None)
4✔
326
        to_send.pop('collection', None)
4✔
327
        to_send['uri'] = result_file
4✔
328
        to_send['uid'] = filename
4✔
329
        to_send['sensor'] = scene.get('instrument', None)
4✔
330
        if not to_send['sensor']:
4!
331
            to_send['sensor'] = scene.get('sensor', None)
×
332

333
        to_send['platform_name'] = scene['platform_name']
4✔
334
        to_send['orbit_number'] = scene['orbit_number']
4✔
335
        if result_file.endswith("xml"):
4!
336
            to_send['format'] = 'PPS-XML'
4✔
337
            to_send['type'] = 'XML'
4✔
338
        if result_file.endswith("nc"):
4!
UNCOV
339
            to_send['format'] = 'CF'
×
UNCOV
340
            to_send['type'] = 'netCDF4'
×
341
        to_send['data_processing_level'] = '2'
4✔
342

343
        to_send['start_time'], to_send['end_time'] = starttime, endtime
4✔
344
        pubmsg = Message('/' + to_send['format'] + '/' +
4✔
345
                         to_send['data_processing_level'] +
346
                         '/' + station +
347
                         '/polar/direct_readout/',
348
                         "file", to_send).encode()
349
        LOG.info("Sending: %s", str(pubmsg))
4✔
350
        try:
4✔
351
            publish_q.put(pubmsg)
4✔
UNCOV
352
        except Exception:
×
353
            LOG.warning("Failed putting message on the queue, will send it now...")
×
UNCOV
354
            publish_q.send(pubmsg)
×
355

356

357
def logreader(stream, log_func):
4✔
358
    while True:
2✔
359
        mystring = stream.readline()
4✔
360
        if not mystring:
4!
361
            break
4✔
UNCOV
362
        log_func(mystring.strip())
×
363
    stream.close()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc