Source code for nemseer.query

import ast
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

import pyarrow.parquet as pq  # type: ignore
import xarray as xr
from attrs import converters, define, field, validators
from dateutil.relativedelta import relativedelta

from .data import DATETIME_FORMAT, ENUMERATED_TABLES, FORECAST_TYPES

logger = logging.getLogger(__name__)


def _dt_converter(value: str) -> datetime:
    """Convert string to datetime.

    Args:
        value: String with format %Y/%m/%d %H:%M
    Returns:
        Datetime object
    Raises:
        ValueError: If provided datetime string is invalid
    """
    try:
        dt = datetime.strptime(value, DATETIME_FORMAT)
        return dt
    except ValueError:
        try:
            dt = datetime.strptime(value, DATETIME_FORMAT + ":%S")
            if dt.second != 0:
                raise ValueError("If seconds provided in datetime, must be zero.")
            else:
                return dt
        except ValueError:
            raise ValueError(
                "Datetime invalid."
                + " Datetime should be provided as yyyy/mm/dd HH:MM, "
                + " or yyyy/mm/dd HH:MM:00."
            )


def _tablestr_converter(value: Union[str, List[str]]) -> List[str]:
    """Returns a list of table strings, even if a single string is provided

    Args:
        value: Table string or list of table strings
    Returns:
        List of strings
    """
    if type(value) is str:
        return [value]
    else:
        return list(value)


def _validate_run_chronology(instance, attribute, value):
    """Validates run_start against run_end"""
    if value > instance.run_end:
        raise ValueError(
            "Forecast end datetime must be greater than or equal to"
            + " run start datetime."
        )


def _validate_forecasted_chronology(instance, attribute, value):
    """Validated forecasted_start against forecasted_end"""
    if value > instance.forecasted_end:
        raise ValueError(
            "Forecasted end datetime must be greater than or equal to"
            + " forecasted start datetime."
        )


def _validate_relative_chronology(instance, attribute, value) -> None:
    """Validates run_start against forecasted_start"""
    if value > instance.forecasted_start:
        raise ValueError(
            "Forecasted start datetime should be at or after run start datetime."
        )


def _validate_path(instance, attribute, value) -> None:
    """Check the path is a directory and creates it if it is not"""
    if not value.is_dir():
        value.mkdir()
        logger.info(f"Created directory at {value.absolute()}")


def _validate_raw_not_processed(instance, attribute, value) -> None:
    """Check that :attr:`raw_cache` and :attr:`processed_cache` are distinct."""
    if instance.processed_cache:
        if value.absolute() == instance.processed_cache.absolute():
            raise ValueError(
                f"{attribute.name} should be distinct from processed_cache"
            )


def _enumerate_tables(tables: List[str], table_str: str, range_to: int) -> List[str]:
    """Given a table name, populates a list with enumerated table names

    For example, given 'CONSTRAINTSOLUTION' and `range_to`=3, will populate
    `tables` with ['CONSTRAINTSOLUTION1',...,'CONSTRAINTSOLUTION3'].

    Args:
        tables: Table list
        table_str: Table string to enumerate
        range_to: Integer to enumerate to
    Returns:
        `tables` with enumerated `table_str`
    """
    tables.remove(table_str)
    for i in range(1, range_to + 1):
        tables.append(f"{table_str}{i}")
    return tables


def _construct_sqlloader_filename(
    year: int, month: int, forecast_type: str, table: str
) -> str:
    """Constructs filename without file type

    Args:
        year: Year
        month: Month
        forecast_type: One of :data:`nemseer.forecast_types`. See :term:`forecast types`
        table: The name of the table required
    Returns:
        Filename string without file type
    """
    (stryear, strmonth) = (str(year), str(month).rjust(2, "0"))
    if forecast_type == "PREDISPATCH" and table != "MNSPBIDTRK":
        prefix = f"PUBLIC_DVD_{forecast_type}{table}"
    else:
        prefix = f"PUBLIC_DVD_{forecast_type}_{table}"
    fn = prefix + f"_{stryear}{strmonth}010000"
    return fn


[docs]def generate_sqlloader_filenames( run_start: datetime, run_end: datetime, forecast_type: str, tables: List[str], ) -> Dict[Tuple[int, int, str], str]: """Generates MMSDM Historical Data SQLLoader file names based on provided query data Returns a tuple of query metadata (`table`, `year`, `month`) mapped to each filename Args: run_start: Forecast runs at or after this datetime are queried. run_end: Forecast runs before or at this datetime are queried. forecast_type: One of :data:`nemseer.forecast_types`. tables: Table or tables required, provided as a List. Returns: A tuple of query metadata (`table`, `year`, `month`) mapped to each format-agnostic (:term:`SQLLoader`) filename """ def _determine_delta_months(start: datetime, end: datetime): """Determines the widest range of months that encompass :attr:`start` and :attr:`end. Edge cases must be appropriately handled: - 2014/05/31 and 2014/06/01 have relativedelta of 1 day, but two data months (05/2014 and 06/2014) are required. - 2014/05/31 23:00 to 2014/06/01 00:00 only require data for 05/2014 Args: start: datetime of start end: datetime of end Returns delta_months, the total number of months to consider """ MONTH = relativedelta(months=1) full_delta = relativedelta(end, start) delta_months = full_delta.months * MONTH if full_delta.years > 0: delta_months += full_delta.years * MONTH * 12 if ( (start + delta_months).month != end.month and not (end.day == 1 and end.hour == 0 and end.minute == 0) and ( full_delta.days != 0 or full_delta.hours != 0 or full_delta.minutes != 0 ) ): delta_months += MONTH return delta_months.months + (delta_months.years * 12) MONTH = relativedelta(months=1) int_months = _determine_delta_months(run_start, run_end) intervening_dates = [run_start + x * MONTH for x in range(0, int_months + 1)] filename_data = {} for ftype in ENUMERATED_TABLES: if forecast_type == ftype: for table, enumerate_to in ENUMERATED_TABLES[ftype]: if table in tables: tables = _enumerate_tables(tables, table, enumerate_to) for table in tables: for date in intervening_dates: (year, month) = (date.year, date.month) fname = _construct_sqlloader_filename(year, month, forecast_type, table) filename_data[(year, month, table)] = fname return filename_data
[docs]@define class Query: """:class:`Query` validates user inputs and dispatches data downloaders and compilers Construct :class:`Query` using the :meth:`Query.initialise()` constructor. This ensures query metadata is constructed approriately. Query: - Validates user input data - Checks datetimes fit :attr:`yyyy/mm/dd HH:MM` format - Checks datetime chronology (e.g. end is after start) - Checks requested datetimes are valid for each :term:`forecast type` - Validates :term:`forecast type` - Validates user-requested tables against what is available on NEMWeb - Retains query metadata (via constructor class method :meth:`nemseer.query.Query.initialise`) - Can check :attr:`raw_cache` and :attr:`processed_cache` contents to streamline query compilation Attributes: run_start: Forecast runs at or after this datetime are queried. run_end: Forecast runs before or at this datetime are queried. forecasted_start: Forecasts pertaining to times at or after this datetime are retained. forecasted_end: Forecasts pertaining to times before or at this datetime are retaned. forecast_type: One of :data:`nemseer.forecast_types`. tables: Table or tables required. A single table can be supplied as a string. Multiple tables can be supplied as a list of strings. metadata: Metadata dictionary. Constructed by :meth:`Query.initialise()`. raw_cache: Path to build or reuse :term:`raw_cache`. processed_cache (optional): Path to build or reuse :term:`processed_cache`. Should be distinct from :attr:`raw_cache` processed_queries: Defaults to `None` on initialisation. Populated once :meth:`Query.find_table_queries_in_processed_cache` is called. """ run_start: datetime = field( converter=_dt_converter, validator=[_validate_run_chronology, _validate_relative_chronology], ) run_end: datetime = field(converter=_dt_converter) forecasted_start: datetime = field( converter=_dt_converter, validator=[_validate_forecasted_chronology] ) forecasted_end: datetime = field(converter=_dt_converter) forecast_type: str = field(validator=validators.in_(FORECAST_TYPES)) tables: List[str] = field(converter=_tablestr_converter) metadata: Dict[str, str] raw_cache: Path = field( converter=Path, validator=[ _validate_path, _validate_raw_not_processed, ], ) processed_cache: Optional[Path] = field( default=None, converter=converters.optional(Path), validator=validators.optional(_validate_path), ) processed_queries: Union[Dict[str, Path], Dict] = field(default=None)
[docs] @classmethod def initialise( cls, run_start: str, run_end: str, forecasted_start: str, forecasted_end: str, forecast_type: str, tables: Union[str, List[str]], raw_cache: str, processed_cache: Optional[str] = None, ) -> "Query": """Constructor method for :class:`Query`. Assembles query metatdata.""" metadata = { "run_start": run_start, "run_end": run_end, "forecasted_start": forecasted_start, "forecasted_end": forecasted_end, "forecast_type": forecast_type, } return cls( run_start=run_start, # type: ignore run_end=run_end, # type: ignore forecasted_start=forecasted_start, # type: ignore forecasted_end=forecasted_end, # type: ignore forecast_type=forecast_type, # type: ignore tables=tables, # type: ignore metadata=metadata, # type: ignore raw_cache=raw_cache, # type: ignore processed_cache=processed_cache, # type: ignore )
[docs] def check_all_raw_data_in_cache(self) -> bool: """Checks whether *all* requested data is already in the :attr:`raw_cache` as parquet :meth:`nemseer.downloader.ForecastTypeDownloader.download_csv()` handles partial :attr:`raw_cache` completeness If all requested data is already in the :attr:`raw_cache` as parquet, returns True. Otherwise returns False. """ fnames = generate_sqlloader_filenames( self.run_start, self.run_end, self.forecast_type, self.tables ).values() check = [ (self.raw_cache / Path(fname + ".parquet")).exists() for fname in fnames ] if all(check): logger.info(f"Query raw data already downloaded to {self.raw_cache}") return True else: return False
[docs] def find_table_queries_in_processed_cache(self, data_format: str) -> None: """Determines which tables already have queries saved in the :attr:`processed_cache`. If data_format=df, this function will sieve through the metadata of all parquet files in the :attr:`processed_cache`. Note that parquet metadata is UTF-8 encoded. Similarly, data_format=xr will check the metadata of all netCDF files. Modifies :attr:`Query.processed_queries` from :class:`None` to a :class:`dict`. The :class:`dict` is empty if: 1. :attr:`processed_cache` is :class:`None` 2. No portion of the query has been saved in the :attr:`processed_cache` If a portion of the queries are saved in the :attr:`processed_cache`, then :attr:`Query.processed_queries` will be equal to a :class:`dict` that maps the saved query's table name to the saved query's filename. Args: data_format: As per :func:`nemseer.compile_data` """ tables_in_pcache: Union[Dict[str, Path], Dict] = {} if not self.processed_cache: pass else: if data_format == "df": for file in self.processed_cache.glob("*.parquet"): byte_metadata = pq.read_metadata(file).metadata metadata = ast.literal_eval( (byte_metadata["nemseer".encode()]).decode() ) if ( metadata_table := metadata.pop("table") ) in self.tables and metadata == self.metadata: tables_in_pcache[metadata_table] = file else: continue elif data_format == "xr": for file in self.processed_cache.glob("*.nc"): metadata = xr.open_dataset(file).attrs if ( metadata_table := metadata.pop("table") ) in self.tables and metadata == self.metadata: tables_in_pcache[metadata_table] = file else: continue self.processed_queries = tables_in_pcache