• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CDJellen / ndbc-api / 18829552396

27 Oct 2025 04:10AM UTC coverage: 93.382% (-0.1%) from 93.498%
18829552396

Pull #64

github

web-flow
Merge 3393ec083 into 5c0a7026f
Pull Request #64: Fix #63 - Merge columns across modailities

28 of 35 new or added lines in 2 files covered. (80.0%)

1 existing line in 1 file now uncovered.

1524 of 1632 relevant lines covered (93.38%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.18
/ndbc_api/ndbc_api.py
1
"""An API for retrieving data from the NDBC.
2

3
This module defines the `NdbcApi`, the top-level object which creates, handles,
4
caches, parses, and returns NDBC data.
5

6
Example:
7
    ```python3
8
    from ndbc_api import NdbcApi
9
    api = NdbcApi()
10
    available_stations = api.stations()
11
    modes = api.get_modes()
12
    df_stdmet_tplm2 = api.get_data(
13
        'tplm2',
14
        'stdmet',
15
        '2020-01-01',
16
        '2022-01-01',
17
        as_df=True
18
    )
19
    ```
20

21
Attributes:
22
    log (:obj:`logging.Logger`): The logger at which to register HTTP
23
        request and response status codes and headers used for debug
24
        purposes.
25
    headers(:dict:): The request headers for use in the NDBC API's request
26
        handler.
27
"""
28
import logging
1✔
29
import pickle
1✔
30
import warnings
1✔
31
from concurrent.futures import ThreadPoolExecutor, as_completed
1✔
32
from datetime import datetime, timedelta
1✔
33
from typing import Any, List, Sequence, Tuple, Union, Dict, Optional
1✔
34

35
import xarray
1✔
36
import pandas as pd
1✔
37

38
from .api.handlers.http.data import DataHandler
1✔
39
from .api.handlers.http.stations import StationsHandler
1✔
40
from .config import (DEFAULT_CACHE_LIMIT, HTTP_BACKOFF_FACTOR, HTTP_DEBUG,
1✔
41
                     HTTP_DELAY, HTTP_RETRY, LOGGER_NAME, VERIFY_HTTPS)
42
from .exceptions import (HandlerException, ParserException, RequestException,
1✔
43
                         ResponseException, TimestampException)
44
from .utilities.req_handler import RequestHandler
1✔
45
from .utilities.singleton import Singleton
1✔
46
from .utilities.log_formatter import LogFormatter
1✔
47
from .api.handlers.opendap.data import OpenDapDataHandler
1✔
48
from .utilities.opendap.dataset import concat_datasets, merge_datasets, filter_dataset_by_variable, filter_dataset_by_time_range
1✔
49

50

51
class NdbcApi(metaclass=Singleton):
1✔
52
    """An API for querying the National Data Buoy Center.
53

54
    The `NdbcApi` is metaclassed as a singleton to conserve NDBC resources. It
55
    uses two private handlers to build requests and parse responses to the NDBC
56
    over HTTP(s). It also uses a LRU-cached request handler to execute requests
57
    against the NDBC, logging response statuses as they are executed.
58

59
    Attributes:
60
        logging_level: The `logging.Logger`s log level, 1 if the `debug`
61
            flag is set in the `__init__` method, and 0 otherwise.
62
        cache_limit: The handler's global limit for caching
63
            `NdbcApi` responses. This is implemented as a least-recently
64
            used cache, designed to conserve NDBC resources when querying
65
            measurements for a given station over similar time ranges.
66
        delay: The HTTP(s) request delay parameter, in seconds.
67
        retries: = The number of times to retry a request to the NDBC data
68
            service.
69
        backoff_factor: The back-off parameter, used in conjunction with
70
            `retries` to re-attempt requests to the NDBC data service.
71
        verify_https: A flag which indicates whether to attempt requests to the
72
            NDBC data service over HTTP or HTTPS.
73
        debug: A flag for verbose logging and response-level status reporting.
74
            Affects the instance's `logging.Logger` and the behavior of its
75
            private `RequestHandler` instance.
76
    """
77

78
    logger = logging.getLogger(LOGGER_NAME)
1✔
79
    warnings.simplefilter(action='ignore', category=FutureWarning)
1✔
80

81
    def __init__(
1✔
82
        self,
83
        logging_level: int = logging.WARNING if HTTP_DEBUG else logging.ERROR,
84
        filename: Any = None,
85
        cache_limit: int = DEFAULT_CACHE_LIMIT,
86
        headers: dict = {},
87
        delay: int = HTTP_DELAY,
88
        retries: int = HTTP_RETRY,
89
        backoff_factor: float = HTTP_BACKOFF_FACTOR,
90
        verify_https: bool = VERIFY_HTTPS,
91
        debug: bool = HTTP_DEBUG,
92
    ):
93
        """Initializes the singleton `NdbcApi`, sets associated handlers."""
94
        self.cache_limit = cache_limit
1✔
95
        self.headers = headers
1✔
96
        self._handler = self._get_request_handler(
1✔
97
            cache_limit=self.cache_limit,
98
            delay=delay,
99
            retries=retries,
100
            backoff_factor=backoff_factor,
101
            headers=self.headers,
102
            debug=debug,
103
            verify_https=verify_https,
104
        )
105
        self._stations_api = StationsHandler
1✔
106
        self._data_api = DataHandler
1✔
107
        self._opendap_data_api = OpenDapDataHandler
1✔
108
        self.configure_logging(level=logging_level, filename=filename)
1✔
109

110
    def dump_cache(self, dest_fp: Union[str, None] = None) -> Union[dict, None]:
1✔
111
        """Dump the request cache to dict or the specified filepath.
112

113
        Dump the request, response pairs stored in the `NdbcApi`'s
114
        `Request_handler` as a `dict`, either returning the object, if no
115
        `dest_fp` is specified, or serializing (pickling) the object and writing
116
        it to the specified `dest_fp`.
117

118
        Args:
119
            dest_fp: The destination filepath for the serialized `RequestsCache`
120
                contents.
121

122
        Returns:
123
            The cached request, response pairs as a `dict`, or `None` if a
124
            `dest_fp` is specified when calling the method.
125
        """
126
        data = dict()
1✔
127
        ids = [r.id_ for r in self._handler.stations]
1✔
128
        caches = [r.reqs.cache for r in self._handler.stations]
1✔
129
        if ids:
1✔
130
            for station_id, cache in zip(ids, caches):
1✔
131
                data[station_id] = dict()
1✔
132
                reqs = cache.keys()
1✔
133
                for req in reqs:
1✔
134
                    resp = cache[req].v
1✔
135
                    data[station_id][req] = resp
1✔
136
        if dest_fp:
1✔
137
            with open(dest_fp, 'wb') as f:
1✔
138
                pickle.dump(data, f)
1✔
139
        else:
140
            return data
1✔
141

142
    def clear_cache(self) -> None:
1✔
143
        """Clear the request cache and create a new handler."""
144
        del self._handler
1✔
145
        self._handler = self._get_request_handler(
1✔
146
            cache_limit=self.cache_limit,
147
            delay=HTTP_DELAY,
148
            retries=HTTP_RETRY,
149
            backoff_factor=HTTP_BACKOFF_FACTOR,
150
            headers=self.headers,
151
            debug=HTTP_DEBUG,
152
            verify_https=VERIFY_HTTPS,
153
        )
154

155
    def set_cache_limit(self, new_limit: int) -> None:
1✔
156
        """Set the cache limit for the API's request cache."""
157
        self._handler.set_cache_limit(cache_limit=new_limit)
1✔
158

159
    def get_cache_limit(self) -> int:
1✔
160
        """Get the cache limit for the API's request cache."""
161
        return self._handler.get_cache_limit()
1✔
162

163
    def get_headers(self) -> dict:
1✔
164
        """Return the current headers used by the request handler."""
165
        return self._handler.get_headers()
1✔
166

167
    def update_headers(self, new: dict) -> None:
1✔
168
        """Add new headers to the request handler."""
169
        self._handler.update_headers(new)
1✔
170

171
    def set_headers(self, request_headers: dict) -> None:
1✔
172
        """Reset the request headers using the new supplied headers."""
173
        self._handler.set_headers(request_headers)
1✔
174

175
    def configure_logging(self, level=logging.WARNING, filename=None) -> None:
1✔
176
        """Configures logging for the NdbcApi.
177

178
        Args:
179
            level (int, optional): The logging level. Defaults to logging.WARNING.
180
            filename (str, optional): If provided, logs to the specified file.
181
        """
182
        self.logger.setLevel(level)
1✔
183

184
        handler: logging.Handler
185
        formatter: logging.Formatter
186

187
        for handler in self.logger.handlers[:]:
1✔
188
            self.logger.removeHandler(handler)
1✔
189

190
        if filename:
1✔
191
            handler = logging.FileHandler(filename)
1✔
192
            formatter = logging.Formatter(
1✔
193
                '[%(asctime)s][%(levelname)s]: %(message)s')
194
        else:
195
            handler = logging.StreamHandler()
1✔
196
            formatter = LogFormatter('[%(levelname)s]: %(message)s')
1✔
197

198
        handler.setFormatter(formatter)
1✔
199
        self.logger.addHandler(handler)
1✔
200

201
    def log(self,
1✔
202
            level: int,
203
            station_id: Union[int, str, None] = None,
204
            mode: Union[str, None] = None,
205
            message: Union[str, None] = None,
206
            **extra_data) -> None:
207
        """Logs a structured message with metadata.
208

209
        Args:
210
            level (int): The logging level.
211
            station_id (str, optional): The NDBC station ID.
212
            mode (str, optional): The data mode.
213
            message (str, optional): The log message.
214
            **extra_data: Additional key-value pairs to include in the log.
215
        """
216
        log_data = {}
1✔
217
        if station_id:
1✔
218
            log_data['station_id'] = station_id
1✔
219
        if mode:
1✔
220
            log_data['mode'] = mode
×
221
        if message:
1✔
222
            log_data['message'] = message
1✔
223
        for k, v in extra_data.items():
1✔
224
            log_data[k] = v
×
225
        self.logger.log(level, log_data)
1✔
226

227
    def stations(self, as_df: bool = True) -> Union[pd.DataFrame, dict]:
1✔
228
        """Get all stations and station metadata from the NDBC.
229

230
        Query the NDBC data service for the current available data buoys
231
        (stations), both those maintained by the NDBC and those whose
232
        measurements are managed by the NDBC. Stations are returned by default
233
        as rows of a `pandas.DataFrame`, alongside their realtime data coverage
234
        for some common measurements, their latitude and longitude, and current
235
        station status notes maintained by the NDBC.
236

237
        Args:
238
            as_df: Flag indicating whether to return current station data as a
239
                `pandas.DataFrame` if set to `True` or as a `dict` if `False`.
240

241
        Returns:
242
            The current station data from the NDBC data service, either as a
243
            `pandas.DataFrame` or as a `dict` depending on the value of `as_df`.
244

245
        Raises:
246
            ResponseException: An error occurred while retrieving and parsing
247
                responses from the NDBC data service.
248
        """
249
        try:
1✔
250
            data = self._stations_api.stations(handler=self._handler)
1✔
251
            return self._handle_data(data, as_df, cols=None)
1✔
252
        except (ResponseException, ValueError, KeyError) as e:
1✔
253
            raise ResponseException('Failed to handle returned data.') from e
1✔
254

255
    def historical_stations(self,
1✔
256
                            as_df: bool = True) -> Union[pd.DataFrame, dict]:
257
        """Get historical stations and station metadata from the NDBC.
258

259
        Query the NDBC data service for the historical data buoys
260
        (stations), both those maintained by the NDBC and those which are not. 
261
        Stations are returned by default as rows of a `pandas.DataFrame`, 
262
        alongside their historical data coverage, with one row per tuple of 
263
        (station, historical deployment).
264

265
        Args:
266
            as_df: Flag indicating whether to return current station data as a
267
                `pandas.DataFrame` if set to `True` or as a `dict` if `False`.
268

269
        Returns:
270
            The current station data from the NDBC data service, either as a
271
            `pandas.DataFrame` or as a `dict` depending on the value of `as_df`.
272

273
        Raises:
274
            ResponseException: An error occurred while retrieving and parsing
275
                responses from the NDBC data service.
276
        """
277
        try:
×
278
            data = self._stations_api.historical_stations(handler=self._handler)
×
279
            return self._handle_data(data, as_df, cols=None)
×
280
        except (ResponseException, ValueError, KeyError) as e:
×
281
            raise ResponseException('Failed to handle returned data.') from e
×
282

283
    def nearest_station(
1✔
284
        self,
285
        lat: Union[str, float, None] = None,
286
        lon: Union[str, float, None] = None,
287
    ) -> str:
288
        """Get nearest station to the specified lat/lon.
289

290
        Use the NDBC data service's current station data to determine the
291
        nearest station to the specified latitude and longitude (either as
292
        `float` or as DD.dd[E/W] strings).
293

294
        Args:
295
            lat: The latitude of interest, used to determine the closest
296
                maintained station to the given position.
297
            lon: The longitude of interest, used to determine the closest
298
                maintained station to the given position.
299

300
        Returns:
301
            The station id (e.g. `'tplm2'` or `'41001'`) of the nearest station
302
            with active measurements to the specified lat/lon pair.
303

304
        Raises:
305
            ValueError: The latitude and longitude were not both specified when
306
                querying for the closest station.
307
        """
308
        if not (lat and lon):
1✔
309
            raise ValueError('lat and lon must be specified.')
1✔
310
        nearest_station = self._stations_api.nearest_station(
1✔
311
            handler=self._handler, lat=lat, lon=lon)
312
        return nearest_station
1✔
313

314
    def radial_search(
1✔
315
        self,
316
        lat: Union[str, float, None] = None,
317
        lon: Union[str, float, None] = None,
318
        radius: float = -1,
319
        units: str = 'km',
320
    ) -> pd.DataFrame:
321
        """Get all stations within radius units of the specified lat/lon.
322

323
        Use the NDBC data service's current station data to determine the
324
        stations within radius of the specified latitude and longitude
325
        (passed either as `float` or as DD.dd[E/W] strings).
326

327
        Args:
328
            lat (float): The latitude of interest, used to determine the maintained
329
                stations within radius units of the given position.
330
            lon (float): The longitude of interest, used to determine the maintained
331
                stations within radius units of the given position.
332
            radius (float): The radius in the specified units to search for stations
333
                within.
334
            units (str: 'nm', 'km', or 'mi'): The units of the radius, either 'nm', 'km', or 'mi'.
335

336
        Returns:
337
            A `pandas.DataFrame` of the stations within the specified radius of
338
            the given lat/lon pair.
339

340
        Raises:
341
            ValueError: The latitude and longitude were not both specified when
342
                querying for the closest station, or the radius or units are
343
                invalid.
344
        """
345
        if not (lat and lon):
1✔
346
            raise ValueError('lat and lon must be specified.')
1✔
347
        stations_in_radius = self._stations_api.radial_search(
1✔
348
            handler=self._handler, lat=lat, lon=lon, radius=radius, units=units)
349
        return stations_in_radius
1✔
350

351
    def station(self,
1✔
352
                station_id: Union[str, int],
353
                as_df: bool = False) -> Union[pd.DataFrame, dict]:
354
        """Get metadata for the given station from the NDBC.
355

356
        The NDBC maintains some station-level metadata including status notes,
357
        location information, inclement weather warnings, and measurement notes.
358
        This method is used to request, handle, and parse the metadata for the
359
        given station from the station's NDBC webpage.
360

361
        Args:
362
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
363
                station of interest.
364
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
365
                defaults to `False`, and a `dict` is returned.
366

367
        Returns:
368
            The station metadata for the given station, either as a `dict` or as
369
            a `pandas.DataFrame` if the `as_df` flag is set to `True`.
370

371
        Raises:
372
            ResponseException: An error occurred when requesting and parsing
373
                responses for the specified station.
374
        """
375
        station_id = self._parse_station_id(station_id)
1✔
376
        try:
1✔
377
            data = self._stations_api.metadata(handler=self._handler,
1✔
378
                                               station_id=station_id)
379
            return self._handle_data(data, as_df, cols=None)
1✔
380
        except (ResponseException, ValueError, KeyError) as e:
1✔
381
            raise ResponseException('Failed to handle returned data.') from e
1✔
382

383
    def available_realtime(
1✔
384
        self,
385
        station_id: Union[str, int],
386
        full_response: bool = False,
387
        as_df: Optional[bool] = None,
388
    ) -> Union[List[str], pd.DataFrame, dict]:
389
        """Get the available realtime modalities for a station.
390

391
        While most data buoy (station) measurements are available over
392
        multi-year time ranges, some measurements depreciate or become
393
        unavailable for substantial periods of time. This method queries the
394
        NDBC station webpage for those measurements, and their links, which are
395
        available or were available over the last 45 days.
396

397
        Args:
398
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
399
                station of interest.
400
            full_response: Whether to return the full response from the NDBC
401
                API, defaults to `False` and a list of modes from `get_modes()`
402
                is returned. If `True`, the full URL for each data mode is 
403
                included in the returned `dict` or `pandas.DataFrame`.
404
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
405
                defaults to `False`, and a `dict` is returned.
406

407
        Returns:
408
            The available realtime measurements for the specified station,
409
            alongside their NDBC data links, either as a `dict` or as a
410
            `pandas.DataFrame` if the `as_df` flag is set to `True`.
411

412
        Raises:
413
            ResponseException: An error occurred when requesting and parsing
414
                responses for the specified station.
415
        """
416
        station_id = self._parse_station_id(station_id)
1✔
417
        try:
1✔
418
            station_realtime = self._stations_api.realtime(
1✔
419
                handler=self._handler, station_id=station_id)
420
            full_data = {}
1✔
421
            if full_response:
1✔
422
                if as_df is None:
1✔
423
                    as_df = False
×
424
                full_data = self._handle_data(station_realtime,
1✔
425
                                              as_df,
426
                                              cols=None)
427
                return full_data
1✔
428
            else:
429
                full_data = self._handle_data(station_realtime,
×
430
                                              as_df=False,
431
                                              cols=None)
432

433
            # Parse the modes from the full response
434
            _modes = self.get_modes()
×
435
            station_modes = set()
×
436
            for k in full_data:
×
437
                for m in _modes:
×
438
                    if m in full_data[k]['description']:
×
439
                        station_modes.add(m)
×
440
            return list(station_modes)
×
441
        except (ResponseException, ValueError, KeyError) as e:
1✔
442
            raise ResponseException('Failed to handle returned data.') from e
1✔
443

444
    def available_historical(self,
1✔
445
                             station_id: Union[str, int],
446
                             as_df: bool = False) -> Union[pd.DataFrame, dict]:
447
        """Get the available historical measurements for a station.
448

449
        This method queries the NDBC station webpage for historical, quality
450
        controlled measurements and their associated availability time ranges.
451

452
        Args:
453
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
454
                station of interest.
455
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
456
                defaults to `False`, and a `dict` is returned.
457

458
        Returns:
459
            The available historical measurements for the specified station,
460
            alongside their NDBC data links, either as a `dict` or as a
461
            `pandas.DataFrame` if the `as_df` flag is set to `True`.
462

463
        Raises:
464
            ResponseException: An error occurred when requesting and parsing
465
                responses for the specified station.
466
        """
467
        station_id = self._parse_station_id(station_id)
1✔
468
        try:
1✔
469
            data = self._stations_api.historical(handler=self._handler,
1✔
470
                                                 station_id=station_id)
471
            return self._handle_data(data, as_df, cols=None)
1✔
472
        except (ResponseException, ValueError, KeyError) as e:
×
473
            raise ResponseException('Failed to handle returned data.') from e
×
474

475
    def get_data(
1✔
476
        self,
477
        station_id: Union[int, str, None] = None,
478
        mode: Union[str, None] = None,
479
        start_time: Union[str, datetime] = datetime.now() - timedelta(days=30),
480
        end_time: Union[str, datetime] = datetime.now(),
481
        use_timestamp: bool = True,
482
        as_df: bool = True,
483
        cols: List[str] = None,
484
        station_ids: Union[Sequence[Union[int, str]], None] = None,
485
        modes: Union[List[str], None] = None,
486
        as_xarray_dataset: bool = False,
487
        use_opendap: Optional[bool] = None,
488
    ) -> Union[pd.DataFrame, xarray.Dataset, dict]:
489
        """Execute data query against the specified NDBC station(s).
490

491
        Query the NDBC data service for station-level measurements, using the
492
        `mode` parameter to determine the measurement type (e.g. `'stdmet'` for
493
        standard meterological data or `'cwind'` for continuous winds data). The
494
        time range and data columns of interest may also be specified, such that
495
        a tailored set of requests are executed against the NDBC data service to
496
        generate a single `pandas.DataFrame` or `dict` matching the conditions
497
        specified in the method call. When calling `get_data` with `station_ids`
498
        the station identifier is added as a column to the returned data.
499

500
        Args:
501
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
502
                station of interest. For HF radar data, this should be an
503
                identifier such as `'uswc_6km'` for the US West Coast at 6 km
504
                resolution.
505
            station_ids: A list of NDBC station IDs (e.g. `['tplm2', '41001']`)
506
                for the stations of interest. Not supported for HF radar.
507
            mode: The data measurement type to query for the station (e.g.
508
                `'stdmet'` for standard meterological data or `'cwind'` for
509
                continuous winds data, or `'hfradar'` for HF radar data).
510
            modes: A list of data measurement types to query for the stations
511
                (e.g. `['stdmet', 'cwind']`, `'hfradar'` is not supported).
512
            start_time: The first timestamp of interest  (in UTC) for the data
513
                query, defaulting to 30 days before the current system time.
514
            end_time: The last timestamp of interest (in UTC) for the data
515
                query, defaulting to the current system time.
516
            use_timestamp: A flag indicating whether to parse the NDBC data
517
                service column headers as a timestamp, and to use this timestamp
518
                as the index.
519
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
520
                defaults to `True`, if `False` a `dict` is returned unless
521
                `as_xarray_dataset` is set to `True`.
522
            as_xarray_dataset: Whether to return tbe data as an `xarray.Dataset`,
523
                defaults to `False`.
524
            cols: A list of columns of interest which are selected from the 
525
                available data columns, such that only the desired columns are
526
                returned. All columns are returned if `None` is specified.
527
            use_opendap: An alias for `as_xarray_dataset`.
528

529
        Returns:
530
            The available station(s) measurements for the specified modes, time
531
            range, and columns, either as a `dict` or as a `pandas.DataFrame`
532
            if the `as_df` flag is set to `True`.
533

534
        Raises:
535
            ValueError: Both `station_id` and `station_ids` are `None`, or both
536
                are not `None`. This is also raised if `mode` and `modes` are
537
                `None`, or both are not `None`
538
            RequestException: The specified mode is not available.
539
            ResponseException: There was an error in executing and parsing the
540
                required requests against the NDBC data service.
541
            HandlerException: There was an error in handling the returned data
542
                as a `dict` or `pandas.DataFrame`.
543
        """
544
        if use_opendap is not None:
1✔
545
            as_xarray_dataset = use_opendap
×
546

547
        as_df = as_df and not as_xarray_dataset
1✔
548

549
        self.log(logging.DEBUG,
1✔
550
                 message=f"`get_data` called with arguments: {locals()}")
551

552
        if station_id is None and station_ids is None:
1✔
553
            raise ValueError('Both `station_id` and `station_ids` are `None`.')
×
554
        if station_id is not None and station_ids is not None:
1✔
555
            raise ValueError('`station_id` and `station_ids` cannot both be '
1✔
556
                            'specified.')
557
        if modes is not None:
1✔
558
            if not isinstance(modes, list):
1✔
559
                raise ValueError('`modes` must be a list of strings.')
×
560
            if any(not isinstance(m, str) for m in modes):
1✔
561
                raise ValueError('All elements in `modes` must be strings.')
×
562
            if any(m == 'hfradar' for m in modes):
1✔
563
                raise ValueError(
×
564
                    'HF radar data cannot be requested with `modes`. '
565
                    'Please use `mode` to specify a single `hfradar` mode.')
566
        if mode is None and modes is None:
1✔
567
            raise ValueError('Both `mode` and `modes` are `None`.')
×
568
        if mode is not None and modes is not None:
1✔
569
            raise ValueError('`mode` and `modes` cannot both be specified.')
1✔
570

571
        handle_station_ids: List[Union[int, str]] = []
1✔
572
        handle_modes: List[str] = []
1✔
573

574
        if station_id is not None:
1✔
575
            handle_station_ids.append(station_id)
1✔
576
        if station_ids is not None:
1✔
577
            handle_station_ids.extend(station_ids)
1✔
578
        if mode is not None:
1✔
579
            handle_modes.append(mode)
1✔
580
        if modes is not None:
1✔
581
            handle_modes.extend(modes)
1✔
582

583
        for mode in handle_modes:
1✔
584
            if mode not in self.get_modes(use_opendap=as_xarray_dataset):
1✔
585
                raise RequestException(f"Mode {mode} is not available.")
1✔
586

587
        self.log(logging.INFO,
1✔
588
                 message=(f"Processing request for station_ids "
589
                          f"{handle_station_ids} and modes "
590
                          f"{handle_modes}"))
591

592
        # accumulated_data records the handled response and parsed station_id
593
        # as a tuple, with the data as the first value and the id as the second.
594
        accumulated_data: Dict[str, Dict[str, Union[pd.DataFrame, dict]]] = {}
1✔
595
        for mode in handle_modes:
1✔
596
            accumulated_data[mode] = []
1✔
597

598
            with ThreadPoolExecutor(
1✔
599
                    max_workers=len(handle_station_ids)) as station_executor:
600
                station_futures = {}
1✔
601
                for station_id in handle_station_ids:
1✔
602
                    station_futures[station_id] = station_executor.submit(
1✔
603
                        self._handle_get_data,
604
                        mode=mode,
605
                        station_id=station_id,
606
                        start_time=start_time,
607
                        end_time=end_time,
608
                        use_timestamp=use_timestamp,
609
                        as_df=as_df,
610
                        cols=cols,
611
                        use_opendap=as_xarray_dataset,
612
                    )
613

614
                for future in as_completed(station_futures.values()):
1✔
615
                    try:
1✔
616
                        station_data, station_id = future.result()
1✔
617
                        self.log(
1✔
618
                            level=logging.DEBUG,
619
                            station_id=station_id,
620
                            message=
621
                            f"Successfully processed request for station_id {station_id}"
622
                        )
623
                        if as_df:
1✔
624
                            station_data['station_id'] = station_id
1✔
625
                        accumulated_data[mode].append(station_data)
1✔
626
                    except (RequestException, ResponseException,
1✔
627
                            HandlerException) as e:
628
                        self.log(
×
629
                            level=logging.WARN,
630
                            station_id=station_id,
631
                            message=(
632
                                f"Failed to process request for station_id "
633
                                f"{station_id} with error: {e}"))
634
        self.log(logging.INFO, message="Finished processing request.")
1✔
635
        return self._handle_accumulate_data(accumulated_data)
1✔
636

637
    def get_modes(self,
1✔
638
                  use_opendap: bool = False,
639
                  as_xarray_dataset: Optional[bool] = None) -> List[str]:
640
        """Get the list of supported modes for `get_data(...)`.
641
        
642
        Args:
643
            use_opendap (bool): Whether to return the available
644
                modes for opendap `xarray.Dataset` data.
645
            as_xarray_dataset (bool): An alias for `use_opendap`.
646

647
        Returns:
648
            (List[str]) the available modalities.
649
        """
650
        if as_xarray_dataset is not None:
1✔
651
            use_opendap = as_xarray_dataset
×
652

653
        if use_opendap:
1✔
654
            return [
×
655
                v for v in vars(self._opendap_data_api) if not v.startswith('_')
656
            ]
657
        return [v for v in vars(self._data_api) if not v.startswith('_')]
1✔
658

659
    @staticmethod
1✔
660
    def save_xarray_dataset(dataset: xarray.Dataset, output_filepath: str,
1✔
661
                            **kwargs) -> None:
662
        """
663
        Saves an `xarray.Dataset` to netCDF a user-specified file path.
664

665
        Args:
666
            dataset: The xarray dataset to save.
667
            output_filepath: The path to save the dataset to.
668
            **kwargs: Additional keyword arguments to pass to `dataset.to_netcdf`.
669

670
        Returns:
671
            None: The dataset is written to disk
672
        """
673
        dataset.to_netcdf(output_filepath, **kwargs)
×
674

675
    """ PRIVATE """
1✔
676

677
    def _get_request_handler(
1✔
678
        self,
679
        cache_limit: int,
680
        delay: int,
681
        retries: int,
682
        backoff_factor: float,
683
        headers: dict,
684
        debug: bool,
685
        verify_https: bool,
686
    ) -> Any:
687
        """Build a new `RequestHandler` for the `NdbcApi`."""
688
        return RequestHandler(
1✔
689
            cache_limit=cache_limit or self.cache_limit,
690
            log=self.log,
691
            delay=delay,
692
            retries=retries,
693
            backoff_factor=backoff_factor,
694
            headers=headers,
695
            debug=debug,
696
            verify_https=verify_https,
697
        )
698

699
    @staticmethod
1✔
700
    def _parse_station_id(station_id: Union[str, int]) -> str:
1✔
701
        """Parse station id."""
702
        station_id = str(station_id)  # expect string-valued station id
1✔
703
        station_id = station_id.lower()  # expect lowercased station id
1✔
704
        return station_id
1✔
705

706
    @staticmethod
1✔
707
    def _handle_timestamp(timestamp: Union[datetime, str]) -> datetime:
1✔
708
        """Convert the specified timestamp to `datetime.datetime`."""
709
        if isinstance(timestamp, datetime):
1✔
710
            return timestamp
1✔
711
        else:
712
            try:
1✔
713
                return datetime.strptime(timestamp, '%Y-%m-%d')
1✔
714
            except ValueError as e:
1✔
715
                raise TimestampException from e
1✔
716

717
    @staticmethod
1✔
718
    def _enforce_timerange(df: pd.DataFrame, start_time: datetime,
1✔
719
                           end_time: datetime) -> pd.DataFrame:
720
        """Down-select to the data within the specified `datetime` range."""
721
        try:
1✔
722
            df = df.loc[(df.index.values >= pd.Timestamp(start_time)) &
1✔
723
                        (df.index.values <= pd.Timestamp(end_time))]
724
        except ValueError as e:
1✔
725
            raise TimestampException(
1✔
726
                'Failed to enforce `start_time` to `end_time` range.') from e
727
        return df
1✔
728

729
    @staticmethod
1✔
730
    def _handle_data(data: pd.DataFrame,
1✔
731
                     as_df: bool = True,
732
                     cols: List[str] = None) -> Union[pd.DataFrame, dict]:
733
        """Apply column down selection and return format handling."""
734
        if cols:
1✔
735
            try:
1✔
736
                data = data[[*cols]]
1✔
737
            except (KeyError, ValueError) as e:
1✔
738
                raise ParserException(
1✔
739
                    'Failed to parse column selection.') from e
740
        if as_df and isinstance(data, pd.DataFrame):
1✔
741
            return data
1✔
742
        elif isinstance(data, pd.DataFrame) and not as_df:
1✔
743
            return data.to_dict()
1✔
744
        elif as_df:
1✔
745
            try:
1✔
746
                return pd.DataFrame().from_dict(data, orient='index')
1✔
747
            except (NotImplementedError, ValueError, TypeError) as e:
1✔
748
                raise HandlerException(
1✔
749
                    'Failed to convert `pd.DataFrame` to `dict`.') from e
750
        else:
751
            return data
1✔
752

753
    def _handle_accumulate_data(
1✔
754
        self,
755
        accumulated_data: Dict[str, List[Union[pd.DataFrame, dict,
756
                                               xarray.Dataset]]],
757
    ) -> Union[pd.DataFrame, dict, xarray.Dataset]:
758
        """
759
        Accumulate the data from multiple stations and modes, coalescing
760
        overlapping data.
761
        """
762
        # Prune any modalities that returned no data
763
        for k in list(accumulated_data.keys()):
1✔
764
            if not accumulated_data[k]:
1✔
765
                del accumulated_data[k]
×
766

767
        if not accumulated_data:
1✔
768
            return {}
×
769

770
        # Determine return type from the first available data item
771
        first_key = list(accumulated_data.keys())[0]
1✔
772
        first_item = accumulated_data[first_key][0]
1✔
773

774
        return_as_df = isinstance(first_item, pd.DataFrame)
1✔
775
        use_opendap = isinstance(first_item, xarray.Dataset)
1✔
776

777
        # Flatten all data into a single list if df or xarray
778
        if return_as_df or use_opendap:
1✔
779
            data_list = []
1✔
780
            for mode, station_data in accumulated_data.items():
1✔
781
                data_list.extend(station_data)
1✔
782
            
783
            if not data_list:
1✔
NEW
784
                return pd.DataFrame() if return_as_df else xarray.Dataset()
×
785
        
786
        else:
787
            # For dict response, return data grouped by modality.
788
            # Coalescence does not apply to this structure.
NEW
789
            return accumulated_data
×
790

791
        if return_as_df:
1✔
792
            df = pd.concat(data_list, axis=0)
1✔
793
            if df.empty:
1✔
NEW
794
                return df
×
795
            
796
            df.reset_index(inplace=True, drop=False)
1✔
797
            
798
            # Group by the intended index to merge rows for the same timestamp
799
            index_cols = ['timestamp', 'station_id']
1✔
800
            
801
            present_index_cols = [col for col in index_cols if col in df.columns]
1✔
802
            if not present_index_cols:
1✔
NEW
803
                return df 
×
804

805
            # Aggregate all other columns by taking the first non-null value
806
            agg_cols = [col for col in df.columns if col not in present_index_cols]
1✔
807
            
808
            # Only aggregate if there are columns to aggregate
809
            if agg_cols:
1✔
810
                agg_funcs = {col: 'first' for col in agg_cols}
1✔
811
                df = df.groupby(present_index_cols, as_index=False).agg(agg_funcs)
1✔
812
            else:
NEW
813
                df = df.drop_duplicates(subset=present_index_cols)
×
814

815
            df.set_index(present_index_cols, inplace=True)
1✔
816
            return df
1✔
817

818
        elif use_opendap:
×
819
            # xarray's merge function handles this type of coalescence.
NEW
820
            return merge_datasets(data_list)
×
NEW
821
        return {}
×
822

823
    def _handle_get_data(
1✔
824
        self,
825
        mode: str,
826
        station_id: str,
827
        start_time: datetime,
828
        end_time: datetime,
829
        use_timestamp: bool,
830
        as_df: bool = True,
831
        cols: List[str] = None,
832
        use_opendap: bool = False,
833
    ) -> Tuple[Union[pd.DataFrame, xarray.Dataset, dict], str]:
834
        start_time = self._handle_timestamp(start_time)
1✔
835
        end_time = self._handle_timestamp(end_time)
1✔
836
        station_id = self._parse_station_id(station_id)
1✔
837
        if use_opendap:
1✔
838
            data_api_call = getattr(self._opendap_data_api, mode, None)
×
839
        else:
840
            data_api_call = getattr(self._data_api, mode, None)
1✔
841
        if not data_api_call:
1✔
842
            raise RequestException(
×
843
                'Please supply a supported mode from `get_modes()`.')
844
        try:
1✔
845
            data = data_api_call(
1✔
846
                self._handler,
847
                station_id,
848
                start_time,
849
                end_time,
850
                use_timestamp,
851
            )
852
        except (ResponseException, ValueError, TypeError, KeyError) as e:
×
853
            raise ResponseException(
×
854
                f'Failed to handle API call.\nRaised from {e}') from e
855
        if use_timestamp:
1✔
856
            if use_opendap:
1✔
857
                data = filter_dataset_by_time_range(data, start_time, end_time)
×
858
            else:
859
                data = self._enforce_timerange(df=data,
1✔
860
                                               start_time=start_time,
861
                                               end_time=end_time)
862
        try:
1✔
863
            if use_opendap:
1✔
864
                if cols:
×
865
                    handled_data = filter_dataset_by_variable(data, cols)
×
866
                else:
867
                    handled_data = data
×
868
            else:
869
                handled_data = self._handle_data(data, as_df, cols)
1✔
870
        except (ValueError, KeyError, AttributeError) as e:
1✔
871
            raise ParserException(
×
872
                f'Failed to handle returned data.\nRaised from {e}') from e
873

874
        return (handled_data, station_id)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc