Source code for nemseer.data_handlers

import logging
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Union

import pandas as pd
import xarray as xr

from .data import (
    DATETIME_COLS,
    DATETIME_FORMAT,
    FORECASTED_COL,
    ID_COLS,
    RUNTIME_COL,
    TYPE_COLS,
)

logger = logging.getLogger(__name__)


def _parse_datetime_cols(df: pd.DataFrame) -> pd.DataFrame:
    """Finds datetime columns in the DataFrame and converts them to datetime

    Args:
        df: pandas.DataFrame
    Returns:
        :class:`pandas.DataFrame` with datetime columns converted according to standard
        AEMO
        format
    """
    dt_cols = DATETIME_COLS
    dt_cols_present = dt_cols.intersection(set(df.columns.tolist()))
    for col in dt_cols_present:
        df.loc[:, col] = pd.to_datetime(df[col], format=DATETIME_FORMAT + ":%S")
    return df


def _parse_id_cols(df: pd.DataFrame) -> pd.DataFrame:
    """Finds relevant ID columns in the DataFrame and converts them to categories

    Args:
        df: pandas.DataFrame
    Returns:
        :class:`pandas.DataFrame` with ID columns converted to categories
    """
    id_cols = {
        "CONSTRAINTID",
        "INTERCONNECTORID",
        "DUID",
        "CONNECTIONPOINTID",
        "PARTICIPANTID",
        "EXPORTGENCONID",
        "IMPORTGENCONID",
        "REGIONID",
    }
    id_cols_present = id_cols.intersection(set(df.columns.tolist()))
    for col in id_cols_present:
        df.loc[:, col] = df[col].astype("category")
    return df


def _parse_predispatch_seq_no(df: pd.DataFrame) -> pd.DataFrame:
    """Parses `PREDISPATCHSEQNO` as datetime and adds `PREDISPATCH_RUN_DATETIME`

    Args:
        df: pandas.DataFrame
    Returns:
        :class:`pandas.DataFrame` with additional column `PREDISPATCH_RUN_DATETIME`
    """
    df["PREDISPATCHSEQNO"] = df["PREDISPATCHSEQNO"].astype(int).astype(str)
    parsed = df["PREDISPATCHSEQNO"].str.extract(r"^([0-9]{8})([0-9]{2})$")
    year_month_day = pd.to_datetime(parsed[0], format="%Y%m%d")
    hour_min = ((parsed[1].astype(int) - 1) * pd.Timedelta(minutes=30)).add(
        pd.Timedelta(hours=4, minutes=30)
    )
    df["PREDISPATCH_RUN_DATETIME"] = year_month_day + hour_min  # type: ignore
    return df


[docs]def clean_forecast_csv(filepath_or_buffer: Union[str, Path]) -> pd.DataFrame: """Given a forecast csv filepath or buffer, reads and cleans the forecast csv. Cleans artefacts in the forecast csv files, including AEMO metadata at start of file and end of report line. Also removes any duplicate rows. Args: filepath_or_buffer: As for :func:`pandas.read_csv` Returns: Cleaned :class:`pandas.DataFrame` with forecast data Warning: Removes duplicate rows. Raises a warning when doing so. """ df = pd.read_csv(filepath_or_buffer, skiprows=1, low_memory=False) # remove end of report line df = df.iloc[0:-1, :] # skip AEMO metadata drop_cols = df.columns.tolist()[0:4] df = df.drop(drop_cols, axis="columns") df = _parse_datetime_cols(df) df = _parse_id_cols(df) if "PREDISPATCHSEQNO" in df.columns: df = _parse_predispatch_seq_no(df) for col in [col for col in df.columns if df.dtypes[col] == "float64"]: df[col] = pd.to_numeric(df[col], downcast="integer") for col in [col for col in df.columns if df.dtypes[col] == "float64"]: df[col] = pd.to_numeric(df[col], downcast="float") if any(dup_df := df.duplicated()): dup_rows = dup_df.loc[dup_df is True] logger.warning( "Duplicate rows detected. Dropping the following rows:\n" + f"{dup_rows}" ) df = df.drop_duplicates() return df
def _filter_on_datetime_col( df: pd.DataFrame, dt_col: str, start: datetime, end: datetime ) -> pd.DataFrame: """Filter the given datetime column based on the supplied start and end times Args: df: pandas.DataFrame dt_col: Datetime columns in :attr:`df` start: Start datetime end: End datetime Returns DataFrame with datetime filtering applied on :attr:`dt_col`. """ df = df.loc[df[dt_col] >= start, :] df = df.loc[df[dt_col] <= end, :] return df
[docs]def apply_run_and_forecasted_time_filters( df: pd.DataFrame, run_start: datetime, run_end: datetime, forecasted_start: datetime, forecasted_end: datetime, forecast_type: str, ) -> pd.DataFrame: """Applies filtering for run times (i.e. :term:`run_start` and :term:`run_end`) and forecasted times (i.e. :term:`forecasted_start` and :term:`forecasted_end`). Datetime filtering is applied to a column fetched from lookup tables that map the relevant column name to each :term:`forecast type`. If the run time/forecasted column obtained from the lookup is not present in the DataFrame, the respective filter is not applied. Args: run_start: Forecast runs at or after this datetime are queried. run_end: Forecast runs before or at this datetime are queried. forecasted_start: Forecasts pertaining to times at or after this datetime are retained. forecasted_end: Forecasts pertaining to times before or at this datetime are retaned. forecast_type: One of :data:`nemseer.forecast_types`. Returns: DataFrame with appropriate datetime filtering applied. """ (runtime_col, forecasted_col) = ( RUNTIME_COL[forecast_type], FORECASTED_COL[forecast_type], ) for (start, end), col in zip( ((run_start, run_end), (forecasted_start, forecasted_end)), (runtime_col, forecasted_col), ): if col in df.columns: df = _filter_on_datetime_col(df, col, start, end) return df
[docs]def to_xarray(df: pd.DataFrame, forecast_type: str): """Converts a :class:`pandas.DataFrame` to a :class:`xarray.Dataset` using nemseer definitions to determine Dataset dimensions. The conversion is processed in chunks. If system memory usage exceeds 95%, the conversion is terminated with a MemoryError. This is more informative than the system killing the Python process. Args: df: pandas.DataFrame to be converted. forecast_type: One of :data:`nemseer.forecast_types`. Returns: :class:`xarray.Dataset`. Warning: Raises a warning when attempting to convert high-dimensional data. """ def _determine_multiindex( df: pd.DataFrame, forecast_type: str ) -> Tuple[pd.MultiIndex, List[str]]: """Ascertains which DataFrame columns are in nemseer's datetime, ID or type column lists and then uses these columns to create a MultiIndex. Args: df: pandas DataFrame. forecast_type: One of :data:`nemseer.forecast_types`. Returns: A MultiIndex that will be parsed as dimensions when converting to :class:`xarray.Dataset`, and a list of the columns that the MultiIndex incorporates. """ multiindex_cols = [] names = [] for name, col in zip( ("run_time", "forecasted_time"), (RUNTIME_COL[forecast_type], FORECASTED_COL[forecast_type]), ): if col in df.columns: names.append(name) multiindex_cols.append(col) for col in [col for col in df.columns if col not in multiindex_cols]: if col in ID_COLS or col in TYPE_COLS: names.append(col) multiindex_cols.append(col) multiindex = pd.MultiIndex.from_frame(df[multiindex_cols], names=names) return multiindex, multiindex_cols def _df_to_xarray(df: pd.DataFrame) -> xr.Dataset: """Reformats supplied DataFrame to a MultiIndexed DataFrame and then converts to :class:`xarray.Dataset`. Args: df: pandas DataFrame. Returns: DataFrame converted to xarray Dataset. """ multiindex, multiindex_cols = _determine_multiindex(df, forecast_type) df = df.set_index(multiindex) df = df.drop(multiindex_cols, axis=1) ds = df.to_xarray() return ds # type: ignore dim_cols = [ col for col in df.columns if ( col in ID_COLS or col in TYPE_COLS or col in RUNTIME_COL[forecast_type] or col in FORECASTED_COL[forecast_type] ) ] if len(dim_cols) >= 5 or any( [col for col in df.columns if len(df[col].unique()) > 300] ): logger.warning( "High-dimensional data. Large datetime requests may result in the Python " + "process being killed by the system" ) if "INTERVENTION" in df.columns and any(df["INTERVENTION"] > 0): logger.warning( "Intervention periods detected. Discarding intervention runs for conversion" + " to xarray. For all data including intervention runs, select conversion" + " to pandas DataFrame." ) df = df[df["INTERVENTION"] == 0] ds = _df_to_xarray(df) return ds