Source code for nemseer.nemseer

from typing import Dict, List, Tuple, Union

import pandas as pd
import xarray as xr

from .data_compilers import DataCompiler
from .downloader import ForecastTypeDownloader
from .forecast_type.run_time_generators import generate_runtimes
from .query import Query


def _initiate_downloads_from_query(query: Query, keep_csv: bool = False) -> None:
    """Initiates download actions using :class:`nemseer.query.Query`
    Args:
        query: :class:`nemseer.query.Query`
    Returns:
        None
    """
    if query.check_all_raw_data_in_cache():
        pass
    else:
        downloader = ForecastTypeDownloader.from_Query(query)
        downloader.download_csv()
        downloader.convert_to_parquet(keep_csv=keep_csv)
    return None


[docs]def download_raw_data( forecast_type: str, tables: Union[str, List[str]], raw_cache: str, run_start: Union[str, None] = None, run_end: Union[str, None] = None, forecasted_start: Union[str, None] = None, forecasted_end: Union[str, None] = None, keep_csv: bool = False, ) -> None: """Downloads raw forecast data from NEMWeb MMSDM Historical Data SQLLoader Downloads raw forecast data. Accepts a datetime pair, which can be either of: 1. :attr:`run_start` and :attr:`run_end` 2. :attr:`forecasted_start` and :attr:`forecasted_end` Examples: See :ref:`downloading raw data examples <quick_start:downloading raw data>`. Arguments: forecast_type: One of :data:`nemseer.forecast_types` tables: Table or tables required. A single table can be supplied as a string. Multiple tables can be supplied as a list of strings. raw_cache: Path to create or reuse as :term:`raw_cache`. Files are downloaded to this directory and cached data is maintained in the parquet format. run_start: Forecast runs at or after this datetime are queried. If supplied, must be included with :attr:`run_end`. run_end: Forecast runs before or at this datetime are queried. If supplied, must be included with :attr:`run_start`. forecasted_start: Forecasts pertaining to times at or after this datetime are retained. If supplied, must be included with :attr:`forecasted_end`. forecasted_end: Forecasts pertaining to times before or at this datetime are retained. If supplied, must be included with :attr:`forecasted_start`. keep_csv: Default False. If True, downloaded csvs are retained in the :term:`raw_cache`. Raises: ValueError: If a valid pair of datetimes is not supplied, or if more than a valid pair of datetimes is supplied. """ def _generate_other_datetime_pair( start: str, end: str, input_datetime_type: str, forecast_type: str, ) -> Tuple[str, str]: """Given forecasted times, generates runtimes and vice versa. Args: start: Start datetime. end: End datetime. input_datetime_type: Specified whether :attr:`start` and :attr:`end` correspond to 'run' or 'forecasted' datetimes forecast_type: One of :data:`nemseer.forecast_types` Returns: A tuple of datetime strings that correspond to the "other" set of datetimes. Raises: ValueError: If :attr:`input_datetime_type` does not correspond to 'run' or 'forecasted' """ if input_datetime_type not in ("run", "forecasted"): raise ValueError("Input datetime type must be 'run' or 'forecasted'") if input_datetime_type == "forecasted": (other_start, other_end) = generate_runtimes(start, end, forecast_type) else: other_start = start other_end = start return other_start, other_end if run_start and run_end and not forecasted_start and not forecasted_end: forecasted_start, forecasted_end = _generate_other_datetime_pair( run_start, run_end, "run", forecast_type ) elif forecasted_start and forecasted_end and not run_start and not run_end: run_start, run_end = _generate_other_datetime_pair( forecasted_start, forecasted_end, "forecasted", forecast_type ) else: raise ValueError( "Provide both of run_start and run_end (and no forecasted times)," + " or both of forecasted_start and forecasted_end (and no run times)." ) query = Query.initialise( run_start=run_start, run_end=run_end, forecasted_start=forecasted_start, forecasted_end=forecasted_end, forecast_type=forecast_type, tables=tables, raw_cache=raw_cache, ) _initiate_downloads_from_query(query, keep_csv=keep_csv)
[docs]def compile_data( run_start: str, run_end: str, forecasted_start: str, forecasted_end: str, forecast_type: str, tables: Union[str, List[str]], raw_cache: str, processed_cache: Union[None, str] = None, data_format: str = "df", ) -> Union[Dict[str, pd.DataFrame], Dict[str, xr.Dataset], None]: """Compiles queried data from :attr:`raw_cache` and/or :attr:`processed_cache`. For each queried table, this function: 1. If required, downloads raw forecast data for the table and converts to the requested data structure. 2. Otherwise, compiles table data from either of or both of the caches. 3. Applies user-requested filtering to :term:`run times` and :term:`forecasted times` to any raw data. If :attr:`data_format` = "df" (default), a :class:`pandas.DataFrame` is returned. Otherwise, if :attr:`data_format` = "xr", a :class:`xarray.Dataset` is returned. Examples: See :ref:`compiling data examples <quick_start:compiling data>`. Arguments: run_start: Forecast runs at or after this datetime are queried. run_end: Forecast runs before or at this datetime are queried. forecasted_start: Forecasts pertaining to times at or after this datetime are retained. forecasted_end: Forecasts pertaining to times before or at this datetime are retained. forecast_type: One of :data:`nemseer.forecast_types` tables: Table or tables required. A single table can be supplied as a string. Multiple tables can be supplied as a list of strings. raw_cache: Path to create or reuse as :term:`raw_cache`. Files are downloaded to this directory and cached data is maintained in the parquet format. processed_cache (optional): Path to build or reuse :term:`processed_cache`. Should be distinct from :attr:`raw_cache` data_format: Default is 'df', which returns :class:`pandas DataFrame`. Can also request 'xr', which returns :class:`xarray.Dataset`. """ if data_format not in (fmts := ("df", "xr")): raise ValueError(f"Invalid data format. Formats include: {fmts}") query = Query.initialise( run_start=run_start, run_end=run_end, forecasted_start=forecasted_start, forecasted_end=forecasted_end, forecast_type=forecast_type, tables=tables, raw_cache=raw_cache, processed_cache=processed_cache, ) query.find_table_queries_in_processed_cache(data_format=data_format) compiler = DataCompiler.from_Query(query) if compiler.raw_tables: _initiate_downloads_from_query(query, keep_csv=False) compiler.compile_raw_data(data_format=data_format) compiler.compile_processed_data(data_format=data_format) if compiler.processed_cache: compiler.write_to_processed_cache() data = compiler.compiled_data return data