• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CDJellen / ndbc-api / 22607414507

03 Mar 2026 03:49AM UTC coverage: 91.682% (-1.7%) from 93.382%
22607414507

push

github

web-flow
Merge pull request #66 from CDJellen/user/cjellen/pandas-v2.X-deprecations-fixes

Consolidated improvements and fixes in light of pandas V2.X parsing deprecations

12 of 50 new or added lines in 4 files covered. (24.0%)

3 existing lines in 1 file now uncovered.

1521 of 1659 relevant lines covered (91.68%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.39
/ndbc_api/ndbc_api.py
1
"""An API for retrieving data from the NDBC.
2

3
This module defines the `NdbcApi`, the top-level object which creates, handles,
4
caches, parses, and returns NDBC data.
5

6
Example:
7
    ```python3
8
    from ndbc_api import NdbcApi
9
    api = NdbcApi()
10
    available_stations = api.stations()
11
    modes = api.get_modes()
12
    df_stdmet_tplm2 = api.get_data(
13
        'tplm2',
14
        'stdmet',
15
        '2020-01-01',
16
        '2022-01-01',
17
        as_df=True
18
    )
19
    ```
20

21
Attributes:
22
    log (:obj:`logging.Logger`): The logger at which to register HTTP
23
        request and response status codes and headers used for debug
24
        purposes.
25
    headers(:dict:): The request headers for use in the NDBC API's request
26
        handler.
27
"""
28
import logging
1✔
29
import pickle
1✔
30

31
from concurrent.futures import ThreadPoolExecutor, as_completed
1✔
32
from datetime import datetime, timedelta
1✔
33
from typing import Any, List, Sequence, Tuple, Union, Dict, Optional
1✔
34

35
import xarray
1✔
36
import pandas as pd
1✔
37

38
from .api.handlers.http.data import DataHandler
1✔
39
from .api.handlers.http.stations import StationsHandler
1✔
40
from .config import (DEFAULT_CACHE_LIMIT, HTTP_BACKOFF_FACTOR, HTTP_DEBUG,
1✔
41
                     HTTP_DELAY, HTTP_RETRY, LOGGER_NAME, VERIFY_HTTPS)
42
from .exceptions import (HandlerException, ParserException, RequestException,
1✔
43
                         ResponseException, TimestampException)
44
from .utilities.req_handler import RequestHandler
1✔
45
from .utilities.singleton import Singleton
1✔
46
from .utilities.log_formatter import LogFormatter
1✔
47
from .api.handlers.opendap.data import OpenDapDataHandler
1✔
48
from .utilities.opendap.dataset import concat_datasets, merge_datasets, filter_dataset_by_variable, filter_dataset_by_time_range
1✔
49

50

51
class NdbcApi(metaclass=Singleton):
1✔
52
    """An API for querying the National Data Buoy Center.
53

54
    The `NdbcApi` is metaclassed as a singleton to conserve NDBC resources. It
55
    uses two private handlers to build requests and parse responses to the NDBC
56
    over HTTP(s). It also uses a LRU-cached request handler to execute requests
57
    against the NDBC, logging response statuses as they are executed.
58

59
    Attributes:
60
        logging_level: The `logging.Logger`s log level, 1 if the `debug`
61
            flag is set in the `__init__` method, and 0 otherwise.
62
        cache_limit: The handler's global limit for caching
63
            `NdbcApi` responses. This is implemented as a least-recently
64
            used cache, designed to conserve NDBC resources when querying
65
            measurements for a given station over similar time ranges.
66
        delay: The HTTP(s) request delay parameter, in seconds.
67
        retries: = The number of times to retry a request to the NDBC data
68
            service.
69
        backoff_factor: The back-off parameter, used in conjunction with
70
            `retries` to re-attempt requests to the NDBC data service.
71
        verify_https: A flag which indicates whether to attempt requests to the
72
            NDBC data service over HTTP or HTTPS.
73
        debug: A flag for verbose logging and response-level status reporting.
74
            Affects the instance's `logging.Logger` and the behavior of its
75
            private `RequestHandler` instance.
76
    """
77

78
    logger = logging.getLogger(LOGGER_NAME)
1✔
79

80
    def __init__(
1✔
81
        self,
82
        logging_level: int = logging.WARNING if HTTP_DEBUG else logging.ERROR,
83
        filename: Any = None,
84
        cache_limit: int = DEFAULT_CACHE_LIMIT,
85
        headers: dict = {},
86
        delay: int = HTTP_DELAY,
87
        retries: int = HTTP_RETRY,
88
        backoff_factor: float = HTTP_BACKOFF_FACTOR,
89
        verify_https: bool = VERIFY_HTTPS,
90
        debug: bool = HTTP_DEBUG,
91
    ):
92
        """Initializes the singleton `NdbcApi`, sets associated handlers."""
93
        self.cache_limit = cache_limit
1✔
94
        self.headers = headers
1✔
95
        self._handler = self._get_request_handler(
1✔
96
            cache_limit=self.cache_limit,
97
            delay=delay,
98
            retries=retries,
99
            backoff_factor=backoff_factor,
100
            headers=self.headers,
101
            debug=debug,
102
            verify_https=verify_https,
103
        )
104
        self._stations_api = StationsHandler
1✔
105
        self._data_api = DataHandler
1✔
106
        self._opendap_data_api = OpenDapDataHandler
1✔
107
        self.configure_logging(level=logging_level, filename=filename)
1✔
108

109
    def dump_cache(self, dest_fp: Union[str, None] = None) -> Union[dict, None]:
1✔
110
        """Dump the request cache to dict or the specified filepath.
111

112
        Dump the request, response pairs stored in the `NdbcApi`'s
113
        `Request_handler` as a `dict`, either returning the object, if no
114
        `dest_fp` is specified, or serializing (pickling) the object and writing
115
        it to the specified `dest_fp`.
116

117
        Args:
118
            dest_fp: The destination filepath for the serialized `RequestsCache`
119
                contents.
120

121
        Returns:
122
            The cached request, response pairs as a `dict`, or `None` if a
123
            `dest_fp` is specified when calling the method.
124
        """
125
        data = dict()
1✔
126
        ids = [r.id_ for r in self._handler.stations]
1✔
127
        caches = [r.reqs.cache for r in self._handler.stations]
1✔
128
        if ids:
1✔
129
            for station_id, cache in zip(ids, caches):
1✔
130
                data[station_id] = dict()
1✔
131
                reqs = cache.keys()
1✔
132
                for req in reqs:
1✔
133
                    resp = cache[req].v
1✔
134
                    data[station_id][req] = resp
1✔
135
        if dest_fp:
1✔
136
            with open(dest_fp, 'wb') as f:
1✔
137
                pickle.dump(data, f)
1✔
138
        else:
139
            return data
1✔
140

141
    def clear_cache(self) -> None:
1✔
142
        """Clear the request cache and create a new handler."""
143
        del self._handler
1✔
144
        self._handler = self._get_request_handler(
1✔
145
            cache_limit=self.cache_limit,
146
            delay=HTTP_DELAY,
147
            retries=HTTP_RETRY,
148
            backoff_factor=HTTP_BACKOFF_FACTOR,
149
            headers=self.headers,
150
            debug=HTTP_DEBUG,
151
            verify_https=VERIFY_HTTPS,
152
        )
153

154
    def set_cache_limit(self, new_limit: int) -> None:
1✔
155
        """Set the cache limit for the API's request cache."""
156
        self._handler.set_cache_limit(cache_limit=new_limit)
1✔
157

158
    def get_cache_limit(self) -> int:
1✔
159
        """Get the cache limit for the API's request cache."""
160
        return self._handler.get_cache_limit()
1✔
161

162
    def get_headers(self) -> dict:
1✔
163
        """Return the current headers used by the request handler."""
164
        return self._handler.get_headers()
1✔
165

166
    def update_headers(self, new: dict) -> None:
1✔
167
        """Add new headers to the request handler."""
168
        self._handler.update_headers(new)
1✔
169

170
    def set_headers(self, request_headers: dict) -> None:
1✔
171
        """Reset the request headers using the new supplied headers."""
172
        self._handler.set_headers(request_headers)
1✔
173

174
    def configure_logging(self, level=logging.WARNING, filename=None) -> None:
1✔
175
        """Configures logging for the NdbcApi.
176

177
        Args:
178
            level (int, optional): The logging level. Defaults to logging.WARNING.
179
            filename (str, optional): If provided, logs to the specified file.
180
        """
181
        self.logger.setLevel(level)
1✔
182

183
        handler: logging.Handler
184
        formatter: logging.Formatter
185

186
        for handler in self.logger.handlers[:]:
1✔
187
            self.logger.removeHandler(handler)
1✔
188

189
        if filename:
1✔
190
            handler = logging.FileHandler(filename)
1✔
191
            formatter = logging.Formatter(
1✔
192
                '[%(asctime)s][%(levelname)s]: %(message)s')
193
        else:
194
            handler = logging.StreamHandler()
1✔
195
            formatter = LogFormatter('[%(levelname)s]: %(message)s')
1✔
196

197
        handler.setFormatter(formatter)
1✔
198
        self.logger.addHandler(handler)
1✔
199

200
    def log(self,
1✔
201
            level: int,
202
            station_id: Union[int, str, None] = None,
203
            mode: Union[str, None] = None,
204
            message: Union[str, None] = None,
205
            **extra_data) -> None:
206
        """Logs a structured message with metadata.
207

208
        Args:
209
            level (int): The logging level.
210
            station_id (str, optional): The NDBC station ID.
211
            mode (str, optional): The data mode.
212
            message (str, optional): The log message.
213
            **extra_data: Additional key-value pairs to include in the log.
214
        """
215
        log_data = {}
1✔
216
        if station_id:
1✔
217
            log_data['station_id'] = station_id
1✔
218
        if mode:
1✔
219
            log_data['mode'] = mode
×
220
        if message:
1✔
221
            log_data['message'] = message
1✔
222
        for k, v in extra_data.items():
1✔
223
            log_data[k] = v
×
224
        self.logger.log(level, log_data)
1✔
225

226
    def stations(self, as_df: bool = True) -> Union[pd.DataFrame, dict]:
1✔
227
        """Get all stations and station metadata from the NDBC.
228

229
        Query the NDBC data service for the current available data buoys
230
        (stations), both those maintained by the NDBC and those whose
231
        measurements are managed by the NDBC. Stations are returned by default
232
        as rows of a `pandas.DataFrame`, alongside their realtime data coverage
233
        for some common measurements, their latitude and longitude, and current
234
        station status notes maintained by the NDBC.
235

236
        Args:
237
            as_df: Flag indicating whether to return current station data as a
238
                `pandas.DataFrame` if set to `True` or as a `dict` if `False`.
239

240
        Returns:
241
            The current station data from the NDBC data service, either as a
242
            `pandas.DataFrame` or as a `dict` depending on the value of `as_df`.
243

244
        Raises:
245
            ResponseException: An error occurred while retrieving and parsing
246
                responses from the NDBC data service.
247
        """
248
        try:
1✔
249
            data = self._stations_api.stations(handler=self._handler)
1✔
250
            return self._handle_data(data, as_df, cols=None)
1✔
251
        except (ResponseException, ValueError, KeyError) as e:
1✔
252
            raise ResponseException('Failed to handle returned data.') from e
1✔
253

254
    def historical_stations(self,
1✔
255
                            as_df: bool = True) -> Union[pd.DataFrame, dict]:
256
        """Get historical stations and station metadata from the NDBC.
257

258
        Query the NDBC data service for the historical data buoys
259
        (stations), both those maintained by the NDBC and those which are not. 
260
        Stations are returned by default as rows of a `pandas.DataFrame`, 
261
        alongside their historical data coverage, with one row per tuple of 
262
        (station, historical deployment).
263

264
        Args:
265
            as_df: Flag indicating whether to return current station data as a
266
                `pandas.DataFrame` if set to `True` or as a `dict` if `False`.
267

268
        Returns:
269
            The current station data from the NDBC data service, either as a
270
            `pandas.DataFrame` or as a `dict` depending on the value of `as_df`.
271

272
        Raises:
273
            ResponseException: An error occurred while retrieving and parsing
274
                responses from the NDBC data service.
275
        """
276
        try:
×
277
            data = self._stations_api.historical_stations(handler=self._handler)
×
278
            return self._handle_data(data, as_df, cols=None)
×
279
        except (ResponseException, ValueError, KeyError) as e:
×
280
            raise ResponseException('Failed to handle returned data.') from e
×
281

282
    def nearest_station(
1✔
283
        self,
284
        lat: Union[str, float, None] = None,
285
        lon: Union[str, float, None] = None,
286
    ) -> str:
287
        """Get nearest station to the specified lat/lon.
288

289
        Use the NDBC data service's current station data to determine the
290
        nearest station to the specified latitude and longitude (either as
291
        `float` or as DD.dd[E/W] strings).
292

293
        Args:
294
            lat: The latitude of interest, used to determine the closest
295
                maintained station to the given position.
296
            lon: The longitude of interest, used to determine the closest
297
                maintained station to the given position.
298

299
        Returns:
300
            The station id (e.g. `'tplm2'` or `'41001'`) of the nearest station
301
            with active measurements to the specified lat/lon pair.
302

303
        Raises:
304
            ValueError: The latitude and longitude were not both specified when
305
                querying for the closest station.
306
        """
307
        if not (lat and lon):
1✔
308
            raise ValueError('lat and lon must be specified.')
1✔
309
        nearest_station = self._stations_api.nearest_station(
1✔
310
            handler=self._handler, lat=lat, lon=lon)
311
        return nearest_station
1✔
312

313
    def radial_search(
1✔
314
        self,
315
        lat: Union[str, float, None] = None,
316
        lon: Union[str, float, None] = None,
317
        radius: float = -1,
318
        units: str = 'km',
319
    ) -> pd.DataFrame:
320
        """Get all stations within radius units of the specified lat/lon.
321

322
        Use the NDBC data service's current station data to determine the
323
        stations within radius of the specified latitude and longitude
324
        (passed either as `float` or as DD.dd[E/W] strings).
325

326
        Args:
327
            lat (float): The latitude of interest, used to determine the maintained
328
                stations within radius units of the given position.
329
            lon (float): The longitude of interest, used to determine the maintained
330
                stations within radius units of the given position.
331
            radius (float): The radius in the specified units to search for stations
332
                within.
333
            units (str: 'nm', 'km', or 'mi'): The units of the radius, either 'nm', 'km', or 'mi'.
334

335
        Returns:
336
            A `pandas.DataFrame` of the stations within the specified radius of
337
            the given lat/lon pair.
338

339
        Raises:
340
            ValueError: The latitude and longitude were not both specified when
341
                querying for the closest station, or the radius or units are
342
                invalid.
343
        """
344
        if not (lat and lon):
1✔
345
            raise ValueError('lat and lon must be specified.')
1✔
346
        stations_in_radius = self._stations_api.radial_search(
1✔
347
            handler=self._handler, lat=lat, lon=lon, radius=radius, units=units)
348
        return stations_in_radius
1✔
349

350
    def station(self,
1✔
351
                station_id: Union[str, int],
352
                as_df: bool = False) -> Union[pd.DataFrame, dict]:
353
        """Get metadata for the given station from the NDBC.
354

355
        The NDBC maintains some station-level metadata including status notes,
356
        location information, inclement weather warnings, and measurement notes.
357
        This method is used to request, handle, and parse the metadata for the
358
        given station from the station's NDBC webpage.
359

360
        Args:
361
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
362
                station of interest.
363
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
364
                defaults to `False`, and a `dict` is returned.
365

366
        Returns:
367
            The station metadata for the given station, either as a `dict` or as
368
            a `pandas.DataFrame` if the `as_df` flag is set to `True`.
369

370
        Raises:
371
            ResponseException: An error occurred when requesting and parsing
372
                responses for the specified station.
373
        """
374
        station_id = self._parse_station_id(station_id)
1✔
375
        try:
1✔
376
            data = self._stations_api.metadata(handler=self._handler,
1✔
377
                                               station_id=station_id)
378
            return self._handle_data(data, as_df, cols=None)
1✔
379
        except (ResponseException, ValueError, KeyError) as e:
1✔
380
            raise ResponseException('Failed to handle returned data.') from e
1✔
381

382
    def available_realtime(
1✔
383
        self,
384
        station_id: Union[str, int],
385
        full_response: bool = False,
386
        as_df: Optional[bool] = None,
387
    ) -> Union[List[str], pd.DataFrame, dict]:
388
        """Get the available realtime modalities for a station.
389

390
        While most data buoy (station) measurements are available over
391
        multi-year time ranges, some measurements depreciate or become
392
        unavailable for substantial periods of time. This method queries the
393
        NDBC station webpage for those measurements, and their links, which are
394
        available or were available over the last 45 days.
395

396
        Args:
397
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
398
                station of interest.
399
            full_response: Whether to return the full response from the NDBC
400
                API, defaults to `False` and a list of modes from `get_modes()`
401
                is returned. If `True`, the full URL for each data mode is 
402
                included in the returned `dict` or `pandas.DataFrame`.
403
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
404
                defaults to `False`, and a `dict` is returned.
405

406
        Returns:
407
            The available realtime measurements for the specified station,
408
            alongside their NDBC data links, either as a `dict` or as a
409
            `pandas.DataFrame` if the `as_df` flag is set to `True`.
410

411
        Raises:
412
            ResponseException: An error occurred when requesting and parsing
413
                responses for the specified station.
414
        """
415
        station_id = self._parse_station_id(station_id)
1✔
416
        try:
1✔
417
            station_realtime = self._stations_api.realtime(
1✔
418
                handler=self._handler, station_id=station_id)
419
            full_data = {}
1✔
420
            if full_response:
1✔
421
                if as_df is None:
1✔
422
                    as_df = False
×
423
                full_data = self._handle_data(station_realtime,
1✔
424
                                              as_df,
425
                                              cols=None)
426
                return full_data
1✔
427
            else:
428
                full_data = self._handle_data(station_realtime,
×
429
                                              as_df=False,
430
                                              cols=None)
431

432
            # Parse the modes from the full response
433
            _modes = self.get_modes()
×
434
            station_modes = set()
×
435
            for k in full_data:
×
436
                for m in _modes:
×
437
                    if m in full_data[k]['description']:
×
438
                        station_modes.add(m)
×
439
            return list(station_modes)
×
440
        except (ResponseException, ValueError, KeyError) as e:
1✔
441
            raise ResponseException('Failed to handle returned data.') from e
1✔
442

443
    def available_historical(self,
1✔
444
                             station_id: Union[str, int],
445
                             as_df: bool = False) -> Union[pd.DataFrame, dict]:
446
        """Get the available historical measurements for a station.
447

448
        This method queries the NDBC station webpage for historical, quality
449
        controlled measurements and their associated availability time ranges.
450

451
        Args:
452
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
453
                station of interest.
454
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
455
                defaults to `False`, and a `dict` is returned.
456

457
        Returns:
458
            The available historical measurements for the specified station,
459
            alongside their NDBC data links, either as a `dict` or as a
460
            `pandas.DataFrame` if the `as_df` flag is set to `True`.
461

462
        Raises:
463
            ResponseException: An error occurred when requesting and parsing
464
                responses for the specified station.
465
        """
466
        station_id = self._parse_station_id(station_id)
1✔
467
        try:
1✔
468
            data = self._stations_api.historical(handler=self._handler,
1✔
469
                                                 station_id=station_id)
470
            return self._handle_data(data, as_df, cols=None)
1✔
471
        except (ResponseException, ValueError, KeyError) as e:
×
472
            raise ResponseException('Failed to handle returned data.') from e
×
473

474
    def get_data(
1✔
475
        self,
476
        station_id: Union[int, str, None] = None,
477
        mode: Union[str, None] = None,
478
        start_time: Union[str, datetime] = datetime.now() - timedelta(days=30),
479
        end_time: Union[str, datetime] = datetime.now(),
480
        use_timestamp: bool = True,
481
        as_df: bool = True,
482
        cols: List[str] = None,
483
        station_ids: Union[Sequence[Union[int, str]], None] = None,
484
        modes: Union[List[str], None] = None,
485
        as_xarray_dataset: bool = False,
486
        use_opendap: Optional[bool] = None,
487
    ) -> Union[pd.DataFrame, xarray.Dataset, dict]:
488
        """Execute data query against the specified NDBC station(s).
489

490
        Query the NDBC data service for station-level measurements, using the
491
        `mode` parameter to determine the measurement type (e.g. `'stdmet'` for
492
        standard meterological data or `'cwind'` for continuous winds data). The
493
        time range and data columns of interest may also be specified, such that
494
        a tailored set of requests are executed against the NDBC data service to
495
        generate a single `pandas.DataFrame` or `dict` matching the conditions
496
        specified in the method call. When calling `get_data` with `station_ids`
497
        the station identifier is added as a column to the returned data.
498

499
        Args:
500
            station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
501
                station of interest. For HF radar data, this should be an
502
                identifier such as `'uswc_6km'` for the US West Coast at 6 km
503
                resolution.
504
            station_ids: A list of NDBC station IDs (e.g. `['tplm2', '41001']`)
505
                for the stations of interest. Not supported for HF radar.
506
            mode: The data measurement type to query for the station (e.g.
507
                `'stdmet'` for standard meterological data or `'cwind'` for
508
                continuous winds data, or `'hfradar'` for HF radar data).
509
            modes: A list of data measurement types to query for the stations
510
                (e.g. `['stdmet', 'cwind']`, `'hfradar'` is not supported).
511
            start_time: The first timestamp of interest  (in UTC) for the data
512
                query, defaulting to 30 days before the current system time.
513
            end_time: The last timestamp of interest (in UTC) for the data
514
                query, defaulting to the current system time.
515
            use_timestamp: A flag indicating whether to parse the NDBC data
516
                service column headers as a timestamp, and to use this timestamp
517
                as the index.
518
            as_df: Whether to return station-level data as a `pandas.DataFrame`,
519
                defaults to `True`, if `False` a `dict` is returned unless
520
                `as_xarray_dataset` is set to `True`.
521
            as_xarray_dataset: Whether to return tbe data as an `xarray.Dataset`,
522
                defaults to `False`.
523
            cols: A list of columns of interest which are selected from the 
524
                available data columns, such that only the desired columns are
525
                returned. All columns are returned if `None` is specified.
526
            use_opendap: An alias for `as_xarray_dataset`.
527

528
        Returns:
529
            The available station(s) measurements for the specified modes, time
530
            range, and columns, either as a `dict` or as a `pandas.DataFrame`
531
            if the `as_df` flag is set to `True`.
532

533
        Raises:
534
            ValueError: Both `station_id` and `station_ids` are `None`, or both
535
                are not `None`. This is also raised if `mode` and `modes` are
536
                `None`, or both are not `None`
537
            RequestException: The specified mode is not available.
538
            ResponseException: There was an error in executing and parsing the
539
                required requests against the NDBC data service.
540
            HandlerException: There was an error in handling the returned data
541
                as a `dict` or `pandas.DataFrame`.
542
        """
543
        if use_opendap is not None:
1✔
544
            as_xarray_dataset = use_opendap
×
545

546
        as_df = as_df and not as_xarray_dataset
1✔
547

548
        self.log(logging.DEBUG,
1✔
549
                 message=f"`get_data` called with arguments: {locals()}")
550

551
        if station_id is None and station_ids is None:
1✔
552
            raise ValueError('Both `station_id` and `station_ids` are `None`.')
×
553
        if station_id is not None and station_ids is not None:
1✔
554
            raise ValueError('`station_id` and `station_ids` cannot both be '
1✔
555
                            'specified.')
556
        if modes is not None:
1✔
557
            if not isinstance(modes, list):
1✔
558
                raise ValueError('`modes` must be a list of strings.')
×
559
            if any(not isinstance(m, str) for m in modes):
1✔
560
                raise ValueError('All elements in `modes` must be strings.')
×
561
            if any(m == 'hfradar' for m in modes):
1✔
562
                raise ValueError(
×
563
                    'HF radar data cannot be requested with `modes`. '
564
                    'Please use `mode` to specify a single `hfradar` mode.')
565
        if mode is None and modes is None:
1✔
566
            raise ValueError('Both `mode` and `modes` are `None`.')
×
567
        if mode is not None and modes is not None:
1✔
568
            raise ValueError('`mode` and `modes` cannot both be specified.')
1✔
569

570
        handle_station_ids: List[Union[int, str]] = []
1✔
571
        handle_modes: List[str] = []
1✔
572

573
        if station_id is not None:
1✔
574
            handle_station_ids.append(station_id)
1✔
575
        if station_ids is not None:
1✔
576
            handle_station_ids.extend(station_ids)
1✔
577
        if mode is not None:
1✔
578
            handle_modes.append(mode)
1✔
579
        if modes is not None:
1✔
580
            handle_modes.extend(modes)
1✔
581

582
        for mode in handle_modes:
1✔
583
            if mode not in self.get_modes(use_opendap=as_xarray_dataset):
1✔
584
                raise RequestException(f"Mode {mode} is not available.")
1✔
585

586
        self.log(logging.INFO,
1✔
587
                 message=(f"Processing request for station_ids "
588
                          f"{handle_station_ids} and modes "
589
                          f"{handle_modes}"))
590

591
        # accumulated_data records the handled response and parsed station_id
592
        # as a tuple, with the data as the first value and the id as the second.
593
        accumulated_data: Dict[str, Dict[str, Union[pd.DataFrame, dict]]] = {}
1✔
594
        for mode in handle_modes:
1✔
595
            accumulated_data[mode] = []
1✔
596

597
            with ThreadPoolExecutor(
1✔
598
                    max_workers=len(handle_station_ids)) as station_executor:
599
                station_futures = {}
1✔
600
                for station_id in handle_station_ids:
1✔
601
                    station_futures[station_id] = station_executor.submit(
1✔
602
                        self._handle_get_data,
603
                        mode=mode,
604
                        station_id=station_id,
605
                        start_time=start_time,
606
                        end_time=end_time,
607
                        use_timestamp=use_timestamp,
608
                        as_df=as_df,
609
                        cols=cols,
610
                        use_opendap=as_xarray_dataset,
611
                    )
612

613
                for future in as_completed(station_futures.values()):
1✔
614
                    try:
1✔
615
                        station_data, station_id = future.result()
1✔
616
                        self.log(
1✔
617
                            level=logging.DEBUG,
618
                            station_id=station_id,
619
                            message=
620
                            f"Successfully processed request for station_id {station_id}"
621
                        )
622
                        if as_df:
1✔
623
                            station_data['station_id'] = station_id
1✔
624
                        accumulated_data[mode].append(station_data)
1✔
625
                    except (RequestException, ResponseException,
1✔
626
                            HandlerException) as e:
627
                        self.log(
×
628
                            level=logging.WARN,
629
                            station_id=station_id,
630
                            message=(
631
                                f"Failed to process request for station_id "
632
                                f"{station_id} with error: {e}"))
633
        self.log(logging.INFO, message="Finished processing request.")
1✔
634
        return self._handle_accumulate_data(
1✔
635
            accumulated_data,
636
            as_xarray_dataset=as_xarray_dataset,
637
        )
638

639
    def get_modes(self,
1✔
640
                  use_opendap: bool = False,
641
                  as_xarray_dataset: Optional[bool] = None) -> List[str]:
642
        """Get the list of supported modes for `get_data(...)`.
643
        
644
        Args:
645
            use_opendap (bool): Whether to return the available
646
                modes for opendap `xarray.Dataset` data.
647
            as_xarray_dataset (bool): An alias for `use_opendap`.
648

649
        Returns:
650
            (List[str]) the available modalities.
651
        """
652
        if as_xarray_dataset is not None:
1✔
653
            use_opendap = as_xarray_dataset
×
654

655
        if use_opendap:
1✔
656
            return [
×
657
                v for v in vars(self._opendap_data_api) if not v.startswith('_')
658
            ]
659
        return [v for v in vars(self._data_api) if not v.startswith('_')]
1✔
660

661
    @staticmethod
1✔
662
    def save_xarray_dataset(dataset: xarray.Dataset, output_filepath: str,
1✔
663
                            **kwargs) -> None:
664
        """
665
        Saves an `xarray.Dataset` to netCDF a user-specified file path.
666

667
        Args:
668
            dataset: The xarray dataset to save.
669
            output_filepath: The path to save the dataset to.
670
            **kwargs: Additional keyword arguments to pass to `dataset.to_netcdf`.
671

672
        Returns:
673
            None: The dataset is written to disk
674
        """
NEW
675
        if not isinstance(dataset, xarray.Dataset):
×
NEW
676
            raise ValueError(
×
677
                f'Expected an xarray.Dataset, got {type(dataset).__name__}. '
678
                'This can happen when get_data() returns an empty result. '
679
                'Check the logs for errors.'
680
            )
UNCOV
681
        dataset.to_netcdf(output_filepath, **kwargs)
×
682

683
    """ PRIVATE """
1✔
684

685
    def _get_request_handler(
1✔
686
        self,
687
        cache_limit: int,
688
        delay: int,
689
        retries: int,
690
        backoff_factor: float,
691
        headers: dict,
692
        debug: bool,
693
        verify_https: bool,
694
    ) -> Any:
695
        """Build a new `RequestHandler` for the `NdbcApi`."""
696
        return RequestHandler(
1✔
697
            cache_limit=cache_limit or self.cache_limit,
698
            log=self.log,
699
            delay=delay,
700
            retries=retries,
701
            backoff_factor=backoff_factor,
702
            headers=headers,
703
            debug=debug,
704
            verify_https=verify_https,
705
        )
706

707
    @staticmethod
1✔
708
    def _parse_station_id(station_id: Union[str, int]) -> str:
1✔
709
        """Parse station id."""
710
        station_id = str(station_id)  # expect string-valued station id
1✔
711
        station_id = station_id.lower()  # expect lowercased station id
1✔
712
        return station_id
1✔
713

714
    @staticmethod
1✔
715
    def _handle_timestamp(timestamp: Union[datetime, str]) -> datetime:
1✔
716
        """Convert the specified timestamp to `datetime.datetime`."""
717
        if isinstance(timestamp, datetime):
1✔
718
            return timestamp
1✔
719
        else:
720
            try:
1✔
721
                return datetime.strptime(timestamp, '%Y-%m-%d')
1✔
722
            except ValueError as e:
1✔
723
                raise TimestampException from e
1✔
724

725
    @staticmethod
1✔
726
    def _enforce_timerange(df: pd.DataFrame, start_time: datetime,
1✔
727
                           end_time: datetime) -> pd.DataFrame:
728
        """Down-select to the data within the specified `datetime` range."""
729
        try:
1✔
730
            df = df.loc[(df.index.values >= pd.Timestamp(start_time)) &
1✔
731
                        (df.index.values <= pd.Timestamp(end_time))]
732
        except ValueError as e:
1✔
733
            raise TimestampException(
1✔
734
                'Failed to enforce `start_time` to `end_time` range.') from e
735
        return df
1✔
736

737
    @staticmethod
1✔
738
    def _handle_data(data: pd.DataFrame,
1✔
739
                     as_df: bool = True,
740
                     cols: List[str] = None) -> Union[pd.DataFrame, dict]:
741
        """Apply column down selection and return format handling."""
742
        if cols:
1✔
743
            try:
1✔
744
                data = data[[*cols]]
1✔
745
            except (KeyError, ValueError) as e:
1✔
746
                raise ParserException(
1✔
747
                    'Failed to parse column selection.') from e
748
        if as_df and isinstance(data, pd.DataFrame):
1✔
749
            return data
1✔
750
        elif isinstance(data, pd.DataFrame) and not as_df:
1✔
751
            return data.to_dict()
1✔
752
        elif as_df:
1✔
753
            try:
1✔
754
                return pd.DataFrame().from_dict(data, orient='index')
1✔
755
            except (NotImplementedError, ValueError, TypeError) as e:
1✔
756
                raise HandlerException(
1✔
757
                    'Failed to convert `pd.DataFrame` to `dict`.') from e
758
        else:
759
            return data
1✔
760

761
    def _handle_accumulate_data(
1✔
762
        self,
763
        accumulated_data: Dict[str, List[Union[pd.DataFrame, dict,
764
                                               xarray.Dataset]]],
765
        as_xarray_dataset: bool = False,
766
    ) -> Union[pd.DataFrame, dict, xarray.Dataset]:
767
        """
768
        Accumulate the data from multiple stations and modes, coalescing
769
        overlapping data.
770
        """
771
        # Prune any modalities that returned no data
772
        for k in list(accumulated_data.keys()):
1✔
773
            if not accumulated_data[k]:
1✔
774
                del accumulated_data[k]
×
775

776
        if not accumulated_data:
1✔
NEW
777
            if as_xarray_dataset:
×
NEW
778
                return xarray.Dataset()
×
UNCOV
779
            return {}
×
780

781
        # Determine return type from the first available data item
782
        first_key = list(accumulated_data.keys())[0]
1✔
783
        first_item = accumulated_data[first_key][0]
1✔
784

785
        return_as_df = isinstance(first_item, pd.DataFrame)
1✔
786
        use_opendap = isinstance(first_item, xarray.Dataset)
1✔
787

788
        # Flatten all data into a single list if df or xarray
789
        if return_as_df or use_opendap:
1✔
790
            data_list = []
1✔
791
            for mode, station_data in accumulated_data.items():
1✔
792
                data_list.extend(station_data)
1✔
793
            
794
            if not data_list:
1✔
795
                return pd.DataFrame() if return_as_df else xarray.Dataset()
×
796
        
797
        else:
798
            # For dict response, return data grouped by modality.
799
            # Coalescence does not apply to this structure.
800
            return accumulated_data
×
801

802
        if return_as_df:
1✔
803
            df = pd.concat(data_list, axis=0)
1✔
804
            if df.empty:
1✔
805
                return df
×
806
            
807
            df.reset_index(inplace=True, drop=False)
1✔
808
            
809
            # Group by the intended index to merge rows for the same timestamp
810
            index_cols = ['timestamp', 'station_id']
1✔
811
            
812
            present_index_cols = [col for col in index_cols if col in df.columns]
1✔
813
            if not present_index_cols:
1✔
814
                return df 
×
815

816
            # Aggregate all other columns by taking the first non-null value
817
            agg_cols = [col for col in df.columns if col not in present_index_cols]
1✔
818
            
819
            # Only aggregate if there are columns to aggregate
820
            if agg_cols:
1✔
821
                agg_funcs = {col: 'first' for col in agg_cols}
1✔
822
                df = df.groupby(present_index_cols, as_index=False).agg(agg_funcs)
1✔
823
            else:
824
                df = df.drop_duplicates(subset=present_index_cols)
×
825

826
            df.set_index(present_index_cols, inplace=True)
1✔
827
            # Normalize null representation: concat/groupby may
828
            # introduce None for object-dtype columns.
829
            return df.where(df.notna())
1✔
830

831
        elif use_opendap:
×
832
            # xarray's merge function handles this type of coalescence.
833
            return merge_datasets(data_list)
×
NEW
834
        if as_xarray_dataset:
×
NEW
835
            return xarray.Dataset()
×
UNCOV
836
        return {}
×
837

838
    def _handle_get_data(
1✔
839
        self,
840
        mode: str,
841
        station_id: str,
842
        start_time: datetime,
843
        end_time: datetime,
844
        use_timestamp: bool,
845
        as_df: bool = True,
846
        cols: List[str] = None,
847
        use_opendap: bool = False,
848
    ) -> Tuple[Union[pd.DataFrame, xarray.Dataset, dict], str]:
849
        start_time = self._handle_timestamp(start_time)
1✔
850
        end_time = self._handle_timestamp(end_time)
1✔
851
        station_id = self._parse_station_id(station_id)
1✔
852
        if use_opendap:
1✔
853
            data_api_call = getattr(self._opendap_data_api, mode, None)
×
854
        else:
855
            data_api_call = getattr(self._data_api, mode, None)
1✔
856
        if not data_api_call:
1✔
857
            raise RequestException(
×
858
                'Please supply a supported mode from `get_modes()`.')
859
        try:
1✔
860
            data = data_api_call(
1✔
861
                self._handler,
862
                station_id,
863
                start_time,
864
                end_time,
865
                use_timestamp,
866
            )
867
        except (ResponseException, ValueError, TypeError, KeyError) as e:
×
868
            raise ResponseException(
×
869
                f'Failed to handle API call.\nRaised from {e}') from e
870
        if use_timestamp:
1✔
871
            if use_opendap:
1✔
872
                data = filter_dataset_by_time_range(data, start_time, end_time)
×
873
            else:
874
                data = self._enforce_timerange(df=data,
1✔
875
                                               start_time=start_time,
876
                                               end_time=end_time)
877
        try:
1✔
878
            if use_opendap:
1✔
879
                if cols:
×
880
                    handled_data = filter_dataset_by_variable(data, cols)
×
881
                else:
882
                    handled_data = data
×
883
            else:
884
                handled_data = self._handle_data(data, as_df, cols)
1✔
885
        except (ValueError, KeyError, AttributeError) as e:
1✔
886
            raise ParserException(
×
887
                f'Failed to handle returned data.\nRaised from {e}') from e
888

889
        return (handled_data, station_id)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc