diff --git a/swvo/io/hp/ensemble.py b/swvo/io/hp/ensemble.py index af780d8..d51612b 100755 --- a/swvo/io/hp/ensemble.py +++ b/swvo/io/hp/ensemble.py @@ -7,14 +7,18 @@ import logging import os import warnings +from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Optional +from typing import Iterable, Optional, TypeVar +import numpy as np import pandas as pd +Number = TypeVar("Number", int, float) -class HpEnsemble: + +class HpEnsemble(ABC): """This is a base class for Hp ensemble data. Parameters @@ -96,16 +100,61 @@ def read(self, start_time: datetime, end_time: datetime) -> list[pd.DataFrame]: start_date = start_time.replace(microsecond=0, minute=0, second=0) str_date = start_date.strftime("%Y%m%dT%H0000") - file_list_new_name = sorted( + + file_list = self._ensemble_file_list(str_date) + data = [] + + if len(file_list) == 0: + msg = f"No {self.index} ensemble file found for requested date {start_date}" + logging.error(msg) + raise FileNotFoundError(msg) + + for file in file_list: + hp_df = pd.read_csv(file, names=["t", self.index]) + + hp_df["t"] = pd.to_datetime(hp_df["t"], utc=True) + hp_df.index = hp_df["t"] + hp_df = hp_df.drop(labels=["t"], axis=1) + + hp_df = hp_df.truncate( + before=start_time - timedelta(minutes=int(self.index_number) - 0.01), + after=end_time + timedelta(minutes=int(self.index_number) + 0.01), + ) + + data.append(hp_df) + + return data + + def _ensemble_file_list(self, str_date: str) -> list[Path]: + """Check for the existence of ensemble files for a given date. + + Parameters + ---------- + str_date : str + Date string in the format YYYYMMDDTHH0000. + + Returns + ------- + list[Path] + A list of file paths for the ensemble files. + + Warnings + -------- + DeprecationWarning + Warns if deprecated file naming convention is used. + """ + + file_list_old_name = sorted( self.data_dir.glob(f"FORECAST_{self.index.title()}_{str_date}_ensemble_*.csv"), key=lambda x: int(x.stem.split("_")[-1]), ) - file_list_old_name = sorted( + file_list_new_name = sorted( self.data_dir.glob(f"FORECAST_{self.index.upper()}_SWIFT_DRIVEN_swift_{str_date}_ensemble_*.csv"), key=lambda x: int(x.stem.split("_")[-1]), ) - data = [] + + file_list: list[Path] if len(file_list_new_name) == 0 and len(file_list_old_name) == 0: file_list = [] @@ -118,26 +167,129 @@ def read(self, start_time: datetime, end_time: datetime) -> list[pd.DataFrame]: ) file_list = file_list_old_name - if len(file_list) == 0: - msg = f"No {self.index} ensemble file found for requested date {start_date}" + return file_list + + @abstractmethod + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: Number) -> list[pd.DataFrame]: + if start_time is not None and not start_time.tzinfo: + start_time = start_time.replace(tzinfo=timezone.utc) + if end_time is not None and not end_time.tzinfo: + end_time = end_time.replace(tzinfo=timezone.utc) + + if not (0 <= horizon <= 72): + raise ValueError("Horizon must be between 0 and 72 hours") + + if self.index == "hp30": + freq = "0.5h" + if horizon % 0.5 != 0: + raise ValueError("Horizon for hp30 must be in 0.5 hour increments") + elif self.index == "hp60": + freq = "1h" + if horizon % 1 != 0: + raise ValueError("Horizon for hp60 must be in 1 hour increments") + + align_start_to_hp_hr = start_time.replace(hour=start_time.hour, minute=0, second=0, microsecond=0) + align_end_to_hp_hr = end_time.replace(hour=end_time.hour, minute=0, second=0, microsecond=0) + + full_time_range = pd.date_range(align_start_to_hp_hr, align_end_to_hp_hr, freq=freq, tz=timezone.utc) + + file_offsets, time_indices = self._get_file_offsets_and_time_indices(full_time_range, horizon) + + max_ensembles = 30 # Maximum number of ensemble files to check + ensemble_dfs = [pd.DataFrame(index=full_time_range) for _ in range(max_ensembles)] + + for file_time, file_offset, iloc in zip(full_time_range, file_offsets, time_indices): + str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000") + file_list = self._ensemble_file_list(str_date) + for ensemble_idx in range(max_ensembles): + df = ensemble_dfs[ensemble_idx] + if ensemble_idx < len(file_list): + data = pd.read_csv( + file_list[ensemble_idx], + names=["Forecast Time", self.index], + parse_dates=["Forecast Time"], + ).iloc[iloc] + data["source"] = str_date + data["Forecast Time"] = data["Forecast Time"].tz_localize("UTC") + df.loc[file_time, "Forecast Time"] = data["Forecast Time"] + df.loc[file_time, self.index] = data[self.index] + df.loc[file_time, "source"] = file_list[ensemble_idx].stem + else: + df.loc[file_time, self.index] = np.nan + + for df in ensemble_dfs: + df["horizon"] = horizon + df.index.name = "Time" + ensemble_dfs = [df for df in ensemble_dfs if not df[self.index].isna().all()] + + if len(ensemble_dfs) == 0: + msg = f"No ensemble data found for the requested period {start_time} to {end_time} and horizon {horizon} hours. Check the data directory {self.data_dir} for available data." logging.error(msg) raise FileNotFoundError(msg) - for file in file_list: - hp_df = pd.read_csv(file, names=["t", self.index]) + return ensemble_dfs - hp_df["t"] = pd.to_datetime(hp_df["t"], utc=True) - hp_df.index = hp_df["t"] - hp_df = hp_df.drop(labels=["t"], axis=1) + def _get_file_offsets_and_time_indices( + self, file_time_range: Iterable, forecast_horizon: float + ) -> tuple[list[int], list[int]]: + """ + Compute file offsets and time indices for a given forecast horizon. - hp_df = hp_df.truncate( - before=start_time - timedelta(minutes=int(self.index_number) - 0.01), - after=end_time + timedelta(minutes=int(self.index_number) + 0.01), - ) + Parameters + ---------- + file_time_range : iterable + Available file time steps. + forecast_horizon : float + Forecast horizon in hours (can be fractional, e.g., 0.5, 1.5, 72.0). - data.append(hp_df) + Returns + ------- + file_offsets : list[int] + Offsets from the given time steps indicating which files to read. + time_indices : list[int] + Time indices to use for each file. + """ + file_offsets = [] + time_indices = [] - return data + for current_time in file_time_range: + current_hour = current_time.hour + current_minute = current_time.minute + current_fractional_hour = current_hour + current_minute / 60.0 + + # Target forecast time (fractional hours from same midnight) + target_fractional_hour = current_fractional_hour + forecast_horizon + + # Find the file base hour (multiple of 3) that contains this target + # Files are grouped by 3s: [0,1,2] use base 0, [3,4,5] use base 3, etc. + # Each file covers 72 hours from its base hour + + # Determine which 3-hour group the target falls into + target_hour_int = int(target_fractional_hour) + target_base_hour = (target_hour_int // 3) * 3 + + if target_fractional_hour <= target_base_hour + 72: + file_base_hour = target_base_hour + else: + # use next group + file_base_hour = target_base_hour + 3 + + # Determine which file in the group (base, base+1, base+2) to use + # Use the file that was created at or before current_time + if current_hour >= file_base_hour: + # Use the latest file in the group that's <= current_hour + file_hour = min(current_hour, file_base_hour + 2) + else: + file_hour = file_base_hour + + file_offset = file_hour - current_hour + file_offsets.append(file_offset) + hours_from_file_start = target_fractional_hour - file_base_hour + resolution = 0.5 if self.index == "hp30" else 1.0 + time_index = int(hours_from_file_start / resolution) + time_indices.append(time_index) + + return file_offsets, time_indices class Hp30Ensemble(HpEnsemble): @@ -154,17 +306,69 @@ class Hp30Ensemble(HpEnsemble): def __init__(self, data_dir: Optional[Path] = None) -> None: super().__init__("hp30", data_dir) + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: float) -> list[pd.DataFrame]: + """Read Ensemble Hp30 forecast data for a given time range and forecast horizon. + + Parameters + ---------- + start_time : datetime + Start time of the period for which to read the data. + end_time : datetime + End time of the period for which to read the data. + horizon : float + Forecast horizon (in hours). + + Returns + ------- + list[:class:`pandas.DataFrame`] + A list of data frames containing ensemble data for the requested period. + + Raises + ------ + ValueError + Raises `ValueError` if the horizon is not between 0 and 72 hours. + ValueError + Raises `ValueError` if the horizon is not in 0.5 hour increments. + """ + return super().read_with_horizon(start_time, end_time, horizon) + class Hp60Ensemble(HpEnsemble): - """A class to handle Hp30 ensemble data. + """A class to handle Hp60 ensemble data. Parameters ---------- data_dir : str | Path, optional - Data directory for the Hp30 ensemble data. If not provided, it will be read from the environment variable + Data directory for the Hp60 ensemble data. If not provided, it will be read from the environment variable """ ENV_VAR_NAME = "HP60_ENSEMBLE_FORECAST_DIR" def __init__(self, data_dir: Optional[Path] = None) -> None: super().__init__("hp60", data_dir) + + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: int) -> list[pd.DataFrame]: + """Read Ensemble Hp60 forecast data for a given time range and forecast horizon. + + Parameters + ---------- + start_time : datetime + Start time of the period for which to read the data. + end_time : datetime + End time of the period for which to read the data. + horizon : int + Forecast horizon (in hours). + + Returns + ------- + list[:class:`pandas.DataFrame`] + A list of data frames containing ensemble data for the requested period. + + Raises + ------ + ValueError + Raises `ValueError` if the horizon is not between 0 and 72 hours. + ValueError + Raises `ValueError` if the horizon is not in 1 hour increments. + """ + return super().read_with_horizon(start_time, end_time, horizon) diff --git a/swvo/io/kp/ensemble.py b/swvo/io/kp/ensemble.py index f4bd61a..872cac0 100755 --- a/swvo/io/kp/ensemble.py +++ b/swvo/io/kp/ensemble.py @@ -11,7 +11,7 @@ import warnings from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Optional +from typing import Iterable, Optional import numpy as np import pandas as pd @@ -91,29 +91,10 @@ def read(self, start_time: datetime, end_time: datetime) -> list: start_time = start_time.replace(microsecond=0, minute=0, second=0) str_date = start_time.strftime("%Y%m%dT%H0000") - file_list_old_name = sorted( - self.data_dir.glob(f"FORECAST_PAGER_SWIFT_swift_{str_date}_ensemble_*.csv"), - key=lambda x: int(x.stem.split("_")[-1]), - ) + file_list = self.ensemble_file_list(str_date) - file_list_new_name = sorted( - self.data_dir.glob(f"FORECAST_Kp_{str_date}_ensemble_*.csv"), - key=lambda x: int(x.stem.split("_")[-1]), - ) data = [] - if len(file_list_new_name) == 0 and len(file_list_old_name) == 0: - file_list = [] - elif len(file_list_new_name) > 0: - file_list = file_list_new_name - elif len(file_list_old_name) > 0: - warnings.warn( - "The use of FORECAST_PAGER_SWIFT_swift_* files is deprecated. However since we still have these files from the PAGER project with this prefix, this will be supported", - DeprecationWarning, - stacklevel=2, - ) - file_list = file_list_old_name - if len(file_list) == 0: msg = f"No ensemble files found for requested date {str_date}" warnings.warn(f"{msg}! Returning NaNs dataframe.") @@ -156,3 +137,146 @@ def read(self, start_time: datetime, end_time: datetime) -> list: data.append(df) return data + + def ensemble_file_list(self, str_date: str) -> list[Path]: + """Check for the existence of ensemble files for a given date. + + Parameters + ---------- + str_date : str + Date string in the format YYYYMMDDTHH0000. + + Returns + ------- + list[Path] + A list of file paths for the ensemble files. + + Warnings + -------- + DeprecationWarning + Warns if deprecated file naming convention is used. + """ + + file_list_old_name = sorted( + self.data_dir.glob(f"FORECAST_PAGER_SWIFT_swift_{str_date}_ensemble_*.csv"), + key=lambda x: int(x.stem.split("_")[-1]), + ) + + file_list_new_name = sorted( + self.data_dir.glob(f"FORECAST_Kp_{str_date}_ensemble_*.csv"), + key=lambda x: int(x.stem.split("_")[-1]), + ) + + file_list: list[Path] + + if len(file_list_new_name) == 0 and len(file_list_old_name) == 0: + file_list = [] + elif len(file_list_new_name) > 0: + file_list = file_list_new_name + elif len(file_list_old_name) > 0: + warnings.warn( + "The use of FORECAST_PAGER_SWIFT_swift_* files is deprecated. However since we still have these files from the PAGER project with this prefix, this will be supported", + DeprecationWarning, + stacklevel=2, + ) + file_list = file_list_old_name + return file_list + + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: int) -> list[pd.DataFrame]: + """Read Ensemble Kp forecast data for a given time range and forecast horizon. + + Parameters + ---------- + start_time : datetime + Start time of the period for which to read the data. + end_time : datetime + End time of the period for which to read the data. + horizon : int + Forecast horizon (in hours). + + Returns + ------- + list[:class:`pandas.DataFrame`] + A list of data frames containing ensemble data for the requested period. + + Raises + ------ + ValueError + Raises `ValueError` if the horizon is not between 0 and 72 hours. + """ + if start_time is not None and not start_time.tzinfo: + start_time = start_time.replace(tzinfo=timezone.utc) + if end_time is not None and not end_time.tzinfo: + end_time = end_time.replace(tzinfo=timezone.utc) + + if not (0 <= horizon <= 72): + raise ValueError("Horizon must be between 0 and 72 hours") + + align_start_to_3hr = start_time.replace(hour=(start_time.hour // 3) * 3, minute=0, second=0, microsecond=0) + align_end_to_3hr = end_time.replace(hour=(end_time.hour // 3) * 3, minute=0, second=0, microsecond=0) + + full_time_range = pd.date_range(align_start_to_3hr, align_end_to_3hr, freq="3h", tz=timezone.utc) + + file_offsets, time_indices = self._get_file_offsets_and_time_indices(full_time_range, horizon) + + max_ensembles = 30 # Maximum number of ensemble files to check + ensemble_dfs = [pd.DataFrame(index=full_time_range) for _ in range(max_ensembles)] + + for file_time, file_offset, iloc in zip(full_time_range, file_offsets, time_indices): + str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000") + + file_list = self.ensemble_file_list(str_date) + for ensemble_idx in range(max_ensembles): + df = ensemble_dfs[ensemble_idx] + if ensemble_idx < len(file_list): + data = pd.read_csv( + file_list[ensemble_idx], + names=["Forecast Time", "kp"], + parse_dates=["Forecast Time"], + ).iloc[iloc] + data["source"] = str_date + data["Forecast Time"] = data["Forecast Time"].tz_localize("UTC") + df.loc[file_time, "Forecast Time"] = data["Forecast Time"] + df.loc[file_time, "kp"] = data["kp"] + df.loc[file_time, "source"] = file_list[ensemble_idx].stem + else: + df.loc[file_time, "kp"] = np.nan + + for df in ensemble_dfs: + df["horizon"] = horizon + df.index.name = "Time" + ensemble_dfs = [df for df in ensemble_dfs if not df["kp"].isna().all()] + + return ensemble_dfs + + def _get_file_offsets_and_time_indices( + self, file_time_range: Iterable, forecast_horizon: int + ) -> tuple[list[int], list[int]]: + """ + Compute file offsets and time indices for a given forecast horizon. + + Parameters + ---------- + file_time_range : iterable + Available file time steps. + forecast_horizon : int + Forecast horizon (in hours or chosen unit). + + Returns + ------- + file_offsets : list[int] + Offsets from the given time steps indicating which files to read. + time_indices : list[int] + Time indices to use for each file. + """ + file_offsets = [] + time_indices = [] + + for _ in file_time_range: + file_offset = -(forecast_horizon % 3) + file_offsets.append(file_offset) + + time_index = (forecast_horizon + 2) // 3 + time_indices.append(time_index) + + return file_offsets, time_indices diff --git a/tests/io/dst/test_wdc.py b/tests/io/dst/test_wdc.py index c78cfaf..15076b6 100644 --- a/tests/io/dst/test_wdc.py +++ b/tests/io/dst/test_wdc.py @@ -83,7 +83,7 @@ def test_get_processed_file_list(self, dst_instance): assert len(time_intervals) == 36 def test_download_and_process(self, dst_instance): - dst_instance.download_and_process(datetime(2025, 1, 1), datetime(2025, 2, 1)) + dst_instance.download_and_process(datetime(2025, 8, 1), datetime(2025, 9, 1)) expected_files = list(MOCK_DATA_PATH.glob("WDC_DST_*.csv")) diff --git a/tests/io/hp/test_hp_ensemble.py b/tests/io/hp/test_hp_ensemble.py index 2e5130e..7068779 100644 --- a/tests/io/hp/test_hp_ensemble.py +++ b/tests/io/hp/test_hp_ensemble.py @@ -56,8 +56,8 @@ def test_initialization_without_env_var(self): ): Hp30Ensemble() - def test_invalid_index(self): - with pytest.raises(ValueError, match="Encountered invalid index:.*"): + def test_abc_instantiation(self): + with pytest.raises(TypeError, match="Can't instantiate abstract class*"): HpEnsemble("hp45", data_dir=DATA_DIR) @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) @@ -116,3 +116,177 @@ def test_read_with_default_times(self, instance_type="hp30"): assert isinstance(data[0], pd.DataFrame) assert all("hp30" in i.columns for i in data) assert data[0].index.tz == timezone.utc + + def make_csv_file(self, path, filename, times, values, index_name): + """Helper to create a CSV file with time-value pairs.""" + df = pd.DataFrame({"Forecast Time": times, index_name: values}) + file = path / filename + df.to_csv(file, header=False, index=False) + return file + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_single_file(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=1) + horizon = 1.0 if instance_type == "hp30" else 1 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq=f"{instance.index_number}min") + values = np.arange(10) + + self.make_csv_file( + ensemble_dir, + f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", + times, + values, + index_name, + ) + result = instance.read_with_horizon(start, end, horizon) + + assert isinstance(result, list) + assert all(isinstance(df, pd.DataFrame) for df in result) + assert not result[0][index_name].isna().all() + assert (result[0]["horizon"] == horizon).all() + assert "Forecast Time" in result[0].columns + assert index_name in result[0].columns + assert "source" in result[0].columns + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_multiple_ensembles(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=2) + horizon = 2.0 if instance_type == "hp30" else 2 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq=f"{instance.index_number}min") + values1 = np.arange(10) + values2 = np.arange(10, 20) + + self.make_csv_file( + ensemble_dir, f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", times, values1, index_name + ) + self.make_csv_file( + ensemble_dir, f"FORECAST_{index_name.title()}_{str_date}_ensemble_1.csv", times, values2, index_name + ) + + result = instance.read_with_horizon(start, end, horizon) + + assert len(result) == 2 + assert all(index_name in df.columns for df in result) + freq = "0.5h" if instance_type == "hp30" else "1h" + expected_range = pd.date_range(start, end, freq=freq, tz=timezone.utc) + for df in result: + assert set(df.index) == set(expected_range) + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_nan_fill_for_missing_files(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=2) + horizon = 1.0 if instance_type == "hp30" else 1 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq=f"{instance.index_number}min") + values = np.arange(10) + self.make_csv_file( + ensemble_dir, f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", times, values, index_name + ) + + result = instance.read_with_horizon(start, end, horizon) + + assert len(result) >= 1 + assert not result[0][index_name].isna().all() + assert (result[0]["horizon"] == horizon).all() + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_correct_horizon_selection(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=2) + + horizons_to_test = [0.5, 1.0, 1.5] if instance_type == "hp30" else [1, 2] + for horizon in horizons_to_test: + for existing_file in ensemble_dir.glob("FORECAST_*.csv"): + existing_file.unlink() + + str_date = start.strftime("%Y%m%dT%H0000") + + times = pd.date_range(start.replace(tzinfo=None), periods=50, freq=f"{instance.index_number}min") + values = np.arange(len(times)) + horizon + + self.make_csv_file( + ensemble_dir, + f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", + times, + values, + index_name, + ) + + result = instance.read_with_horizon(start, end, horizon) + assert len(result) >= 1 + df = result[0] + + assert not df[index_name].isna().all() + assert (df["horizon"] == horizon).all() + + @pytest.mark.parametrize("instance_type", ["hp30", "hp60"]) + def test_read_with_horizon_invalid_horizon(self, instance_type): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=1) + + invalid_horizons = [-1, 73, 100] + for horizon in invalid_horizons: + with pytest.raises(ValueError, match="Horizon must be between 0 and 72 hours"): + instance.read_with_horizon(start, end, horizon) + + if instance_type == "hp30": + invalid_increments = [0.3, 1.7, 2.2] + for horizon in invalid_increments: + with pytest.raises(ValueError, match="Horizon for hp30 must be in 0.5 hour increments"): + instance.read_with_horizon(start, end, horizon) + + if instance_type == "hp60": + invalid_increments = [0.5, 1.5, 2.3] + for horizon in invalid_increments: + with pytest.raises(ValueError, match="Horizon for hp60 must be in 1 hour increments"): + instance.read_with_horizon(start, end, horizon) + + @pytest.mark.parametrize("instance_type", ["hp30", "hp60"]) + def test_read_with_horizon_no_files(self, instance_type): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=1) + horizon = 1.0 if instance_type == "hp30" else 1 + + for existing_file in ensemble_dir.glob("FORECAST_*.csv"): + existing_file.unlink() + + with pytest.raises(FileNotFoundError, match="No ensemble data found"): + instance.read_with_horizon(start, end, horizon) diff --git a/tests/io/kp/test_kp_ensemble.py b/tests/io/kp/test_kp_ensemble.py index e035f6e..75cd047 100644 --- a/tests/io/kp/test_kp_ensemble.py +++ b/tests/io/kp/test_kp_ensemble.py @@ -107,3 +107,131 @@ def test_read_with_default_times(self, kp_ensemble_instance): assert isinstance(data[0], pd.DataFrame) assert all("kp" in i.columns for i in data) assert not data[0].empty + + def make_csv_file(self, path, filename, times, values): + """Helper to create a CSV file with time-value pairs.""" + df = pd.DataFrame({"Forecast Time": times, "kp": values}) + file = path / filename + df.to_csv(file, header=False, index=False) + return file + + def test_read_with_horizon_single_file(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=3) + horizon = 3 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq="3h") + values = np.arange(10) + + self.make_csv_file( + tmp_path, + f"FORECAST_Kp_{str_date}_ensemble_0.csv", + times, + values, + ) + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert isinstance(result, list) + assert all(isinstance(df, pd.DataFrame) for df in result) + assert not result[0]["kp"].isna().all() + assert (result[0]["horizon"] == horizon).all() + assert "Forecast Time" in result[0].columns + assert "kp" in result[0].columns + assert "source" in result[0].columns + + def test_read_with_horizon_multiple_ensembles(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=6) + horizon = 6 + + str_date = (start + timedelta(hours=-(horizon % 3))).strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq="3h") + values1 = np.arange(10) + values2 = np.arange(10, 20) + + self.make_csv_file(tmp_path, f"FORECAST_Kp_{str_date}_ensemble_0.csv", times, values1) + self.make_csv_file(tmp_path, f"FORECAST_Kp_{str_date}_ensemble_1.csv", times, values2) + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert len(result) == 2 + assert all("kp" in df.columns for df in result) + for df in result: + assert set(df.index) == set(pd.date_range(start, end, freq="3h", tz=timezone.utc)) + + def test_read_with_horizon_no_files(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=3) + horizon = 3 + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert result == [] + + def test_read_with_horizon_nan_fill_for_missing_files(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=6) + horizon = 3 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq="3h") + values = np.arange(10) + self.make_csv_file(tmp_path, f"FORECAST_Kp_{str_date}_ensemble_0.csv", times, values) + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert len(result) == 1 + assert not result[0]["kp"].isna().all() + assert (result[0]["horizon"] == horizon).all() + + def test_read_with_horizon_correct_horizon_selection(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=6) + + horizons_to_test = [3, 25, 35] + for horizon in horizons_to_test: + file_time = start + file_offset = -(horizon % 3) + str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000") + + times = pd.date_range(start.replace(tzinfo=None), periods=50, freq="3h") + values = np.arange(len(times)) + horizon + + self.make_csv_file( + tmp_path, + f"FORECAST_Kp_{str_date}_ensemble_0.csv", + times, + values, + ) + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + assert len(result) >= 1 + df = result[0] + + file_index = (horizon + 2) // 3 + expected_value = values[file_index] + + actual_value = df.loc[start, "kp"] + assert actual_value == expected_value, ( + f"Expected {expected_value} but got {actual_value} for horizon {horizon}" + ) + + def test_read_with_horizon_invalid_horizon(self, kp_ensemble_instance): + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=3) + invalid_horizons = [-1, 73, 100] + + for horizon in invalid_horizons: + with pytest.raises(ValueError, match="Horizon must be between 0 and 72 hours"): + kp_ensemble_instance.read_with_horizon(start, end, horizon)